summaryrefslogtreecommitdiff
path: root/TAO/tao/GIOP_Message_Base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp706
1 files changed, 109 insertions, 597 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 77a379e1a10..aa0efbb8b97 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -1,17 +1,17 @@
// $Id$
-#include "tao/GIOP_Message_Base.h"
-#include "tao/operation_details.h"
-#include "tao/debug.h"
-#include "tao/ORB_Core.h"
-#include "tao/TAO_Server_Request.h"
-#include "tao/GIOP_Message_Locate_Header.h"
-#include "tao/Transport.h"
-#include "tao/Transport_Mux_Strategy.h"
-#include "tao/LF_Strategy.h"
-#include "tao/Request_Dispatcher.h"
-#include "tao/Codeset_Manager.h"
-#include "tao/SystemException.h"
+#include "GIOP_Message_Base.h"
+#include "operation_details.h"
+#include "debug.h"
+#include "ORB_Core.h"
+#include "TAO_Server_Request.h"
+#include "GIOP_Message_Locate_Header.h"
+#include "Transport.h"
+#include "Transport_Mux_Strategy.h"
+#include "LF_Strategy.h"
+#include "Request_Dispatcher.h"
+#include "Codeset_Manager.h"
+#include "SystemException.h"
/*
* Hook to add additional include files during specializations.
@@ -22,24 +22,22 @@ ACE_RCSID (tao,
GIOP_Message_Base,
"$Id$")
-TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-
-TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core,
- size_t /* input_cdr_size */)
+TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
+ size_t /*input_cdr_size*/)
: orb_core_ (orb_core)
- , message_state_ ()
- , out_stream_ (this->buffer_,
- sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
- TAO_ENCAP_BYTE_ORDER,
- orb_core->output_cdr_buffer_allocator (),
- orb_core->output_cdr_dblock_allocator (),
- orb_core->output_cdr_msgblock_allocator (),
- orb_core->orb_params ()->cdr_memcpy_tradeoff (),
- TAO_DEF_GIOP_MAJOR,
- TAO_DEF_GIOP_MINOR)
+ , message_state_ ()
+ , out_stream_ (this->buffer_,
+ sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
+ TAO_ENCAP_BYTE_ORDER,
+ orb_core->output_cdr_buffer_allocator (),
+ orb_core->output_cdr_dblock_allocator (),
+ orb_core->output_cdr_msgblock_allocator (),
+ orb_core->orb_params ()->cdr_memcpy_tradeoff (),
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR)
{
#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
- ACE_OS::memset (this->buffer_, 0, sizeof (buffer_));
+ ACE_OS::memset(buffer_, 0, sizeof (buffer_));
#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
}
@@ -198,7 +196,7 @@ TAO_GIOP_Message_Base::generate_reply_header (
ACE_TRY
{
// Now call the implementation for the rest of the header
- int const result =
+ int result =
generator_parser->write_reply_header (cdr,
params
ACE_ENV_ARG_PARAMETER);
@@ -218,7 +216,7 @@ TAO_GIOP_Message_Base::generate_reply_header (
{
if (TAO_debug_level > 4)
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
- ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
+ "TAO_GIOP_Message_Base::generate_reply_header");
return -1;
}
@@ -227,6 +225,15 @@ TAO_GIOP_Message_Base::generate_reply_header (
return 0;
}
+
+int
+TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/,
+ int /*block */,
+ ACE_Time_Value * /*max_wait_time*/)
+{
+ return 0;
+}
+
int
TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
{
@@ -234,7 +241,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
char *buf = (char *) stream.buffer ();
// Length of all buffers.
- size_t const total_len =
+ size_t total_len =
stream.total_length ();
// NOTE: Here would also be a fine place to calculate a digital
@@ -244,7 +251,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
// this particular environment and that isn't handled by the
// networking infrastructure (e.g., IPSEC).
- CORBA::ULong const bodylen = static_cast <CORBA::ULong>
+ CORBA::ULong bodylen = static_cast <CORBA::ULong>
(total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
#if !defined (ACE_ENABLE_SWAP_ON_WRITE)
@@ -300,7 +307,6 @@ TAO_GIOP_Message_Base::message_type (
case TAO_GIOP_LOCATEREPLY:
return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY;
-
case TAO_GIOP_REPLY:
return TAO_PLUGGABLE_MESSAGE_REPLY;
@@ -311,19 +317,13 @@ TAO_GIOP_Message_Base::message_type (
return TAO_PLUGGABLE_MESSAGE_FRAGMENT;
case TAO_GIOP_MESSAGERROR:
- return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
-
case TAO_GIOP_CANCELREQUEST:
- return TAO_PLUGGABLE_MESSAGE_CANCELREQUEST;
-
+ // Does it happen? why??
default:
- if (TAO_debug_level > 0)
- {
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ")
ACE_TEXT ("wrong message.\n")));
}
- }
return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
}
@@ -331,51 +331,32 @@ TAO_GIOP_Message_Base::message_type (
int
TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
{
- this->message_state_.reset ();
-
return this->message_state_.parse_message_header (incoming);
}
-int
-TAO_GIOP_Message_Base::parse_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data &qd,
- size_t &mesg_length)
+ssize_t
+TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
{
- if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED;
-
- return 0; /* incomplete header */
- }
- else
- {
- TAO_GIOP_Message_State state;
+ // Actual message size including the header..
+ CORBA::ULong msg_size =
+ this->message_state_.message_size ();
- if (state.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- const size_t message_size = state.message_size (); /* Header + Payload */
-
- if (message_size > incoming.length ())
- {
- qd.missing_data_ = message_size - incoming.length ();
- }
- else
- {
- qd.missing_data_ = 0;
- }
-
- /* init out-parameters */
- this->init_queued_data (&qd, state);
- mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN
- + state.payload_size ();
+ size_t len = incoming.length ();
- return 1; /* complete header */
+ // If we have too many messages or if we have less than even a size
+ // of the GIOP header then ..
+ if (len > msg_size ||
+ len < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ return -1;
}
+ else if (len == msg_size)
+ return 0;
+
+ return msg_size - len;
}
+
int
TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
TAO_Queued_Data *&qd)
@@ -384,45 +365,15 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
{
if (incoming.length () > 0)
{
- // Optimize memory usage, we dont know actual message size
- // so far, but allocate enough space to hold small GIOP
- // messages. This way we avoid expensive "grow" operation
- // for small messages.
- const size_t default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
-
- // Make a node which has at least message block of the size
- // of MESSAGE_HEADER_LEN.
- const size_t buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
- default_buf_size);
-
- // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
-
- qd = this->make_queued_data (buf_size);
-
- if (qd == 0)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
- ACE_TEXT ("out of memory\n")));
- }
- return -1;
- }
+ // Make a node which has a message block of the size of
+ // MESSAGE_HEADER_LEN.
+ qd =
+ this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
qd->msg_block_->copy (incoming.rd_ptr (),
incoming.length ());
-
- incoming.rd_ptr (incoming.length ()); // consume all available data
-
- qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED;
+ qd->missing_data_ = -1;
}
- else
- {
- // handle not initialized variables
- qd = 0; // reset
- }
-
return 0;
}
@@ -436,26 +387,12 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
qd = this->make_queued_data (copying_len);
- if (qd == 0)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
- ACE_TEXT ("out of memory\n")));
- }
- return -1;
- }
-
if (copying_len > incoming.length ())
{
qd->missing_data_ = copying_len - incoming.length ();
+
copying_len = incoming.length ();
}
- else
- {
- qd->missing_data_ = 0;
- }
qd->msg_block_->copy (incoming.rd_ptr (),
copying_len);
@@ -471,71 +408,35 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
ACE_Message_Block &incoming)
{
// Look to see whether we had atleast parsed the GIOP header ...
- if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
+ if (qd->missing_data_ == -1)
{
// The data length that has been stuck in there during the last
// read ....
- size_t const len =
+ size_t len =
qd->msg_block_->length ();
- // paranoid check
- if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // inconsistency - this code should have parsed the header
- // so far
- return -1;
- }
-
// We know that we would have space for
// TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
// from the <incoming> into the message block in <qd>
- const size_t available = incoming.length ();
- const size_t desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
- const size_t n_copy = ace_min (available, desired);
-
- // paranoid check, but would cause endless looping
- if (n_copy == 0)
- {
- return -1;
- }
-
- if (qd->msg_block_->copy (incoming.rd_ptr (),
- n_copy) == -1)
- {
- return -1;
- }
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ TAO_GIOP_MESSAGE_HEADER_LEN - len);
// Move the rd_ptr () in the incoming message block..
- incoming.rd_ptr (n_copy);
-
- // verify sufficient data to parse GIOP header
- if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- return 0; /* continue */
- }
+ incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
TAO_GIOP_Message_State state;
// Parse the message header now...
if (state.parse_message_header (*qd->msg_block_) == -1)
+ return -1;
+
+ // Now grow the message block so that we can copy the rest of
+ // the data...
+ if (qd->msg_block_->space () < state.message_size ())
{
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
- ACE_TEXT ("error parsing header\n") ));
- }
- return -1;
+ ACE_CDR::grow (qd->msg_block_,
+ state.message_size ());
}
- // Now grow the message block so that we can copy the rest of
- // the data, the message_block must be able to hold complete message
- if (ACE_CDR::grow (qd->msg_block_,
- state.message_size ()) == -1) /* GIOP_Header + Payload */
- {
- // on mem-error get rid of context silently, try to avoid
- // system calls that might allocate additional memory
- return -1;
- }
// Copy the pay load..
// Calculate the bytes that needs to be copied in the queue...
@@ -558,11 +459,8 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
// ..now we are set to copy the right amount of data to the
// node..
- if (qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len) == -1)
- {
- return -1;
- }
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
// Set the <rd_ptr> of the <incoming>..
incoming.rd_ptr (copy_len);
@@ -584,19 +482,10 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
copy_len = incoming.length ();
}
- // paranoid check for endless-event-looping
- if (copy_len == 0)
- {
- return -1;
- }
-
// Copy the right amount of data in to the node...
// node..
- if (qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len) == -1)
- {
- return -1;
- }
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
// Set the <rd_ptr> of the <incoming>..
qd->msg_block_->rd_ptr (copy_len);
@@ -606,6 +495,16 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
return 0;
}
+void
+TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
+{
+ // Get the message information
+ this->init_queued_data (qd, this->message_state_);
+
+ // Reset the message_state
+ this->message_state_.reset ();
+}
+
int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
@@ -650,7 +549,7 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
// Get the read and write positions before we steal data.
size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
- size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
if (TAO_debug_level > 0)
@@ -743,7 +642,7 @@ TAO_GIOP_Message_Base::process_reply_message (
// Get the read and write positions before we steal data.
size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
- size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
if (TAO_debug_level > 0)
@@ -805,8 +704,8 @@ TAO_GIOP_Message_Base::process_reply_message (
// every reply on this connection.
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ")
- ACE_TEXT ("dispatch reply failed\n"),
+ "TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, "
+ "dispatch reply failed\n",
params.transport_->id ()));
}
@@ -839,8 +738,7 @@ TAO_GIOP_Message_Base::generate_exception_reply (
// happened -> no hope, close connection.
// Close the handle.
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
+ ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
ACE_TEXT ("generate_exception_reply ()\n")));
return -1;
@@ -856,6 +754,7 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
TAO_OutputCDR &msg)
{
// Reset the message type
+ // Reset the message type
msg.reset ();
CORBA::Octet header[12] =
@@ -931,11 +830,11 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
CORBA::Object_var forward_to;
/*
- * Hook to specialize request processing within TAO
+ * Hook to specialize request processing within TAO
* This hook will be replaced by specialized request
* processing implementation.
*/
-//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
+//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
// Do this before the reply is sent.
this->orb_core_->request_dispatcher ()->dispatch (
@@ -949,18 +848,10 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
if (!CORBA::is_nil (forward_to.in ()))
{
- const CORBA::Boolean permanent_forward_condition =
- this->orb_core_->is_permanent_forward_condition
- (forward_to.in (),
- request.request_service_context ());
-
// We should forward to another object...
TAO_Pluggable_Reply_Params_Base reply_params;
reply_params.request_id_ = request_id;
- reply_params.reply_status_ =
- permanent_forward_condition
- ? TAO_GIOP_LOCATION_FORWARD_PERM
- : TAO_GIOP_LOCATION_FORWARD;
+ reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD;
reply_params.svc_ctx_.length (0);
// Send back the reply service context.
@@ -1432,8 +1323,8 @@ TAO_GIOP_Message_Base::
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
- ACE_TEXT (" connection already closed\n")));
+ "TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -"
+ " connection already closed\n"));
return;
}
#endif
@@ -1454,14 +1345,14 @@ TAO_GIOP_Message_Base::
{
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
- transport->id (), errno));
+ "(%P|%t) error closing connection %u, errno = %d\n",
+ transport->id (), errno));
}
transport->close_connection ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
- transport-> id ()));
+ "(%P|%t) shut down transport, handle %d\n",
+ transport-> id ()));
}
@@ -1574,12 +1465,12 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
"TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
"%s GIOP v%c.%c msg, %d data bytes, %s endian, "
"Type %s[%u]\n",
- ACE_TEXT_CHAR_TO_TCHAR (label),
+ ACE_TEXT_TO_TCHAR_IN (label),
digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
len - TAO_GIOP_MESSAGE_HEADER_LEN ,
(byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
- ACE_TEXT_CHAR_TO_TCHAR(message_name),
+ ACE_TEXT_TO_TCHAR_IN(message_name),
*id));
if (TAO_debug_level >= 10)
@@ -1629,17 +1520,6 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz)
TAO_Queued_Data::make_queued_data (
this->orb_core_->transport_message_buffer_allocator ());
- if (qd == 0)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
- ACE_TEXT ("our of memory, failed to allocate queued data object\n")));
- }
- return 0; // NULL pointer
- }
-
// @@todo: We have a similar method in Transport.cpp. Need to see how
// we can factor them out..
// Make a datablock for the size requested + something. The
@@ -1651,58 +1531,20 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz)
this->orb_core_->create_input_cdr_data_block (sz +
ACE_CDR::MAX_ALIGNMENT);
- if (db == 0)
- {
- TAO_Queued_Data::release (qd);
-
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
- ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"),
- sz));
- }
- return 0; // NULL pointer
- }
-
ACE_Allocator *alloc =
this->orb_core_->input_cdr_msgblock_allocator ();
- if (alloc == 0)
- {
- if (TAO_debug_level >= 8)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,")
- ACE_TEXT (" no ACE_Allocator defined\n")));
- }
- }
-
-
ACE_Message_Block mb (db,
0,
alloc);
ACE_Message_Block *new_mb = mb.duplicate ();
- if (new_mb == 0)
- {
- TAO_Queued_Data::release (qd);
- db->release();
-
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
- ACE_TEXT ("out of memory, failed to allocate message block\n")));
- }
- return 0;
- }
-
ACE_CDR::mb_align (new_mb);
qd->msg_block_ = new_mb;
+
return qd;
}
@@ -1728,343 +1570,13 @@ TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major,
void
TAO_GIOP_Message_Base::init_queued_data (
- TAO_Queued_Data* qd,
- const TAO_GIOP_Message_State& state) const
+ TAO_Queued_Data* qd,
+ const TAO_GIOP_Message_State& state) const
{
qd->byte_order_ = state.byte_order_;
qd->major_version_ = state.giop_version_.major;
qd->minor_version_ = state.giop_version_.minor;
qd->more_fragments_ = state.more_fragments_;
+ qd->request_id_ = state.request_id_;
qd->msg_type_ = this->message_type (state);
}
-
-int
-TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd, CORBA::ULong &request_id) const
-{
- // Get a parser for us
- TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
-
- // Get the state information that we need to use
- this->set_state (qd->major_version_,
- qd->minor_version_,
- generator_parser);
-
- // Get the read and write positions before we steal data.
- size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
- size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
- rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
-
- // Create a input CDR stream. We do the following
- // 1 - If the incoming message block has a data block with a flag
- // DONT_DELETE (for the data block) we create an input CDR
- // stream the same way.
- // 2 - If the incoming message block had a datablock from heap just
- // use it by duplicating it and make the flag 0.
- // NOTE: We use the same data block in which we read the message and
- // we pass it on to the higher layers of the ORB. So we dont to any
- // copies at all here. The same is also done in the higher layers.
-
- ACE_Message_Block::Message_Flags flg = 0;
- ACE_Data_Block *db = 0;
-
- // Get the flag in the message block
- flg = qd->msg_block_->self_flags ();
-
- if (ACE_BIT_ENABLED (flg,
- ACE_Message_Block::DONT_DELETE))
- {
- // Use the same datablock
- db = qd->msg_block_->data_block ();
- }
- else
- {
- // Use a duplicated datablock as the datablock has come off the
- // heap.
- db = qd->msg_block_->data_block ()->duplicate ();
- }
-
-
- TAO_InputCDR input_cdr (db,
- flg,
- rd_pos,
- wr_pos,
- qd->byte_order_,
- qd->major_version_,
- qd->minor_version_,
- this->orb_core_);
-
- if (qd->major_version_ >= 1 &&
- (qd->minor_version_ == 0 || qd->minor_version_ == 1))
- {
- if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY)
- {
- IOP::ServiceContextList service_context;
-
- if ( ! (input_cdr >> service_context &&
- input_cdr >> request_id) )
- {
- return -1;
- }
-
- return 0;
- }
- else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
- {
- if ( ! (input_cdr >> request_id) )
- {
- return -1;
- }
-
- return 0;
- }
- else
- {
- return -1;
- }
- }
- else
- {
- if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
- qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
- {
- // Dealing with GIOP-1.2, the request-id is located directly behind the GIOP-Header.
- // This is true for all message types that might be sent in form of fragments or cancel-requests.
- if ( ! (input_cdr >> request_id) )
- {
- return -1;
- }
-
- return 0;
- }
- else
- {
- return -1;
- }
- }
-
- return -1;
-}
-
-/* @return -1 error, 0 ok, +1 outstanding fragments */
-int
-TAO_GIOP_Message_Base::consolidate_fragmented_message (TAO_Queued_Data *qd, TAO_Queued_Data *&msg)
-{
- TAO::Incoming_Message_Stack reverse_stack;
-
- TAO_Queued_Data *tail = 0;
- TAO_Queued_Data *head = 0;
-
- //
- // CONSOLIDATE FRAGMENTED MESSAGE
- //
-
- // check for error-condition
- if (qd == 0)
- {
- return -1;
- }
-
- if (qd->major_version_ == 1 && qd->minor_version_ == 0)
- {
- TAO_Queued_Data::release (qd);
- return -1; // error: GIOP-1.0 does not support fragments
- }
-
- // If this is not the last fragment, push it onto stack for later processing
- if (qd->more_fragments_)
- {
- this->fragment_stack_.push (qd);
-
- msg = 0; // no consolidated message available yet
- return 1; // status: more messages expected.
- }
-
- tail = qd; // init
-
- // Add the current message block to the end of the chain
- // after adjusting the read pointer to skip the header(s)
- const size_t header_adjustment =
- this->header_length () +
- this->fragment_header_length (tail->major_version_,
- tail->minor_version_);
-
- if (tail->msg_block_->length () < header_adjustment)
- {
- // buffer length not sufficient
- TAO_Queued_Data::release (qd);
- return -1;
- }
-
- // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
- if (tail->major_version_ == 1 && tail->minor_version_ == 1)
- {
- // GIOP-1.1
-
- while (this->fragment_stack_.pop (head) != -1)
- {
- if (head->more_fragments_ &&
- head->major_version_ == 1 &&
- head->minor_version_ == 1 &&
- head->msg_block_->length () >= header_adjustment)
- {
- // adjust the read-pointer, skip the fragment header
- tail->msg_block_->rd_ptr(header_adjustment);
-
- head->msg_block_->cont (tail->msg_block_);
-
- tail->msg_block_ = 0;
-
- TAO_Queued_Data::release (tail);
-
- tail = head;
- }
- else
- {
- reverse_stack.push (head);
- }
- }
- }
- else
- {
- // > GIOP-1.2
-
- CORBA::ULong tmp_request_id = 0;
- if (this->parse_request_id (tail, tmp_request_id) == -1)
- {
- return -1;
- }
-
- const CORBA::ULong request_id = tmp_request_id;
-
- while (this->fragment_stack_.pop (head) != -1)
- {
- CORBA::ULong head_request_id = 0;
- int parse_status = 0;
-
- if (head->more_fragments_ &&
- head->major_version_ >= 1 &&
- head->minor_version_ >= 2 &&
- head->msg_block_->length () >= header_adjustment &&
- (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
- request_id == head_request_id)
- {
- // adjust the read-pointer, skip the fragment header
- tail->msg_block_->rd_ptr(header_adjustment);
-
- head->msg_block_->cont (tail->msg_block_);
-
- tail->msg_block_ = 0;
-
- TAO_Queued_Data::release (tail);
-
- tail = head;
- }
- else
- {
- if (parse_status == -1)
- {
- TAO_Queued_Data::release (head);
- return -1;
- }
-
- reverse_stack.push (head);
- }
- }
- }
-
- // restore stack
- while (reverse_stack.pop (head) != -1)
- {
- this->fragment_stack_.push (head);
- }
-
- if (tail->consolidate () == -1)
- {
- // memory allocation failed
- TAO_Queued_Data::release (tail);
- return -1;
- }
-
- // set out value
- msg = tail;
-
- return 0;
-}
-
-
-int
-TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
-{
- // We must extract the specific request-id from message-buffer
- // and remove all fragments from stack that match this request-id.
-
- TAO::Incoming_Message_Stack reverse_stack;
-
- CORBA::ULong cancel_request_id;
-
- if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
- {
- return -1;
- }
-
- TAO_Queued_Data *head = 0;
-
- // Revert stack
- while (this->fragment_stack_.pop (head) != -1)
- {
- reverse_stack.push (head);
- }
-
- bool discard_all_GIOP11_messages = false;
-
- // Now we are able to process message in order they have arrived.
- // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
- // fragments belong to this message and must be discarded.
- // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
- // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
- // having encoded the request id will be discarded.
- while (reverse_stack.pop (head) != -1)
- {
- CORBA::ULong head_request_id;
-
- if (head->major_version_ == 1 &&
- head->minor_version_ <= 1 &&
- head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id
- this->parse_request_id (head, head_request_id) >= 0 &&
- cancel_request_id == head_request_id)
- {
- TAO_Queued_Data::release (head);
-
- discard_all_GIOP11_messages = true;
- }
- else if (head->major_version_ == 1 &&
- head->minor_version_ <= 1 &&
- discard_all_GIOP11_messages)
- {
- TAO_Queued_Data::release (head);
- }
- else if (head->major_version_ >= 1 &&
- head->minor_version_ >= 2 &&
- this->parse_request_id (head, head_request_id) >= 0 &&
- cancel_request_id == head_request_id)
- {
- TAO_Queued_Data::release (head);
- }
- else
- {
- this->fragment_stack_.push (head);
- }
- }
-
- return 0;
-}
-
-
-TAO_END_VERSIONED_NAMESPACE_DECL