diff options
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 706 |
1 files changed, 109 insertions, 597 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 77a379e1a10..aa0efbb8b97 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -1,17 +1,17 @@ // $Id$ -#include "tao/GIOP_Message_Base.h" -#include "tao/operation_details.h" -#include "tao/debug.h" -#include "tao/ORB_Core.h" -#include "tao/TAO_Server_Request.h" -#include "tao/GIOP_Message_Locate_Header.h" -#include "tao/Transport.h" -#include "tao/Transport_Mux_Strategy.h" -#include "tao/LF_Strategy.h" -#include "tao/Request_Dispatcher.h" -#include "tao/Codeset_Manager.h" -#include "tao/SystemException.h" +#include "GIOP_Message_Base.h" +#include "operation_details.h" +#include "debug.h" +#include "ORB_Core.h" +#include "TAO_Server_Request.h" +#include "GIOP_Message_Locate_Header.h" +#include "Transport.h" +#include "Transport_Mux_Strategy.h" +#include "LF_Strategy.h" +#include "Request_Dispatcher.h" +#include "Codeset_Manager.h" +#include "SystemException.h" /* * Hook to add additional include files during specializations. @@ -22,24 +22,22 @@ ACE_RCSID (tao, GIOP_Message_Base, "$Id$") -TAO_BEGIN_VERSIONED_NAMESPACE_DECL - -TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core, - size_t /* input_cdr_size */) +TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, + size_t /*input_cdr_size*/) : orb_core_ (orb_core) - , message_state_ () - , out_stream_ (this->buffer_, - sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ - TAO_ENCAP_BYTE_ORDER, - orb_core->output_cdr_buffer_allocator (), - orb_core->output_cdr_dblock_allocator (), - orb_core->output_cdr_msgblock_allocator (), - orb_core->orb_params ()->cdr_memcpy_tradeoff (), - TAO_DEF_GIOP_MAJOR, - TAO_DEF_GIOP_MINOR) + , message_state_ () + , out_stream_ (this->buffer_, + sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ + TAO_ENCAP_BYTE_ORDER, + orb_core->output_cdr_buffer_allocator (), + orb_core->output_cdr_dblock_allocator (), + orb_core->output_cdr_msgblock_allocator (), + orb_core->orb_params ()->cdr_memcpy_tradeoff (), + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR) { #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) - ACE_OS::memset (this->buffer_, 0, sizeof (buffer_)); + ACE_OS::memset(buffer_, 0, sizeof (buffer_)); #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ } @@ -198,7 +196,7 @@ TAO_GIOP_Message_Base::generate_reply_header ( ACE_TRY { // Now call the implementation for the rest of the header - int const result = + int result = generator_parser->write_reply_header (cdr, params ACE_ENV_ARG_PARAMETER); @@ -218,7 +216,7 @@ TAO_GIOP_Message_Base::generate_reply_header ( { if (TAO_debug_level > 4) ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, - ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header")); + "TAO_GIOP_Message_Base::generate_reply_header"); return -1; } @@ -227,6 +225,15 @@ TAO_GIOP_Message_Base::generate_reply_header ( return 0; } + +int +TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/, + int /*block */, + ACE_Time_Value * /*max_wait_time*/) +{ + return 0; +} + int TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) { @@ -234,7 +241,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) char *buf = (char *) stream.buffer (); // Length of all buffers. - size_t const total_len = + size_t total_len = stream.total_length (); // NOTE: Here would also be a fine place to calculate a digital @@ -244,7 +251,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) // this particular environment and that isn't handled by the // networking infrastructure (e.g., IPSEC). - CORBA::ULong const bodylen = static_cast <CORBA::ULong> + CORBA::ULong bodylen = static_cast <CORBA::ULong> (total_len - TAO_GIOP_MESSAGE_HEADER_LEN); #if !defined (ACE_ENABLE_SWAP_ON_WRITE) @@ -300,7 +307,6 @@ TAO_GIOP_Message_Base::message_type ( case TAO_GIOP_LOCATEREPLY: return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY; - case TAO_GIOP_REPLY: return TAO_PLUGGABLE_MESSAGE_REPLY; @@ -311,19 +317,13 @@ TAO_GIOP_Message_Base::message_type ( return TAO_PLUGGABLE_MESSAGE_FRAGMENT; case TAO_GIOP_MESSAGERROR: - return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; - case TAO_GIOP_CANCELREQUEST: - return TAO_PLUGGABLE_MESSAGE_CANCELREQUEST; - + // Does it happen? why?? default: - if (TAO_debug_level > 0) - { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ") ACE_TEXT ("wrong message.\n"))); } - } return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } @@ -331,51 +331,32 @@ TAO_GIOP_Message_Base::message_type ( int TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) { - this->message_state_.reset (); - return this->message_state_.parse_message_header (incoming); } -int -TAO_GIOP_Message_Base::parse_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data &qd, - size_t &mesg_length) +ssize_t +TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) { - if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) - { - qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED; - - return 0; /* incomplete header */ - } - else - { - TAO_GIOP_Message_State state; + // Actual message size including the header.. + CORBA::ULong msg_size = + this->message_state_.message_size (); - if (state.parse_message_header (incoming) == -1) - { - return -1; - } - - const size_t message_size = state.message_size (); /* Header + Payload */ - - if (message_size > incoming.length ()) - { - qd.missing_data_ = message_size - incoming.length (); - } - else - { - qd.missing_data_ = 0; - } - - /* init out-parameters */ - this->init_queued_data (&qd, state); - mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN - + state.payload_size (); + size_t len = incoming.length (); - return 1; /* complete header */ + // If we have too many messages or if we have less than even a size + // of the GIOP header then .. + if (len > msg_size || + len < TAO_GIOP_MESSAGE_HEADER_LEN) + { + return -1; } + else if (len == msg_size) + return 0; + + return msg_size - len; } + int TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, TAO_Queued_Data *&qd) @@ -384,45 +365,15 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, { if (incoming.length () > 0) { - // Optimize memory usage, we dont know actual message size - // so far, but allocate enough space to hold small GIOP - // messages. This way we avoid expensive "grow" operation - // for small messages. - const size_t default_buf_size = ACE_CDR::DEFAULT_BUFSIZE; - - // Make a node which has at least message block of the size - // of MESSAGE_HEADER_LEN. - const size_t buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN, - default_buf_size); - - // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN - - qd = this->make_queued_data (buf_size); - - if (qd == 0) - { - if (TAO_debug_level > 0) - { - ACE_ERROR((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") - ACE_TEXT ("out of memory\n"))); - } - return -1; - } + // Make a node which has a message block of the size of + // MESSAGE_HEADER_LEN. + qd = + this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); qd->msg_block_->copy (incoming.rd_ptr (), incoming.length ()); - - incoming.rd_ptr (incoming.length ()); // consume all available data - - qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED; + qd->missing_data_ = -1; } - else - { - // handle not initialized variables - qd = 0; // reset - } - return 0; } @@ -436,26 +387,12 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, qd = this->make_queued_data (copying_len); - if (qd == 0) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") - ACE_TEXT ("out of memory\n"))); - } - return -1; - } - if (copying_len > incoming.length ()) { qd->missing_data_ = copying_len - incoming.length (); + copying_len = incoming.length (); } - else - { - qd->missing_data_ = 0; - } qd->msg_block_->copy (incoming.rd_ptr (), copying_len); @@ -471,71 +408,35 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, ACE_Message_Block &incoming) { // Look to see whether we had atleast parsed the GIOP header ... - if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED) + if (qd->missing_data_ == -1) { // The data length that has been stuck in there during the last // read .... - size_t const len = + size_t len = qd->msg_block_->length (); - // paranoid check - if (len >= TAO_GIOP_MESSAGE_HEADER_LEN) - { - // inconsistency - this code should have parsed the header - // so far - return -1; - } - // We know that we would have space for // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data // from the <incoming> into the message block in <qd> - const size_t available = incoming.length (); - const size_t desired = TAO_GIOP_MESSAGE_HEADER_LEN - len; - const size_t n_copy = ace_min (available, desired); - - // paranoid check, but would cause endless looping - if (n_copy == 0) - { - return -1; - } - - if (qd->msg_block_->copy (incoming.rd_ptr (), - n_copy) == -1) - { - return -1; - } + qd->msg_block_->copy (incoming.rd_ptr (), + TAO_GIOP_MESSAGE_HEADER_LEN - len); // Move the rd_ptr () in the incoming message block.. - incoming.rd_ptr (n_copy); - - // verify sufficient data to parse GIOP header - if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN) - { - return 0; /* continue */ - } + incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); TAO_GIOP_Message_State state; // Parse the message header now... if (state.parse_message_header (*qd->msg_block_) == -1) + return -1; + + // Now grow the message block so that we can copy the rest of + // the data... + if (qd->msg_block_->space () < state.message_size ()) { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ") - ACE_TEXT ("error parsing header\n") )); - } - return -1; + ACE_CDR::grow (qd->msg_block_, + state.message_size ()); } - // Now grow the message block so that we can copy the rest of - // the data, the message_block must be able to hold complete message - if (ACE_CDR::grow (qd->msg_block_, - state.message_size ()) == -1) /* GIOP_Header + Payload */ - { - // on mem-error get rid of context silently, try to avoid - // system calls that might allocate additional memory - return -1; - } // Copy the pay load.. // Calculate the bytes that needs to be copied in the queue... @@ -558,11 +459,8 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, // ..now we are set to copy the right amount of data to the // node.. - if (qd->msg_block_->copy (incoming.rd_ptr (), - copy_len) == -1) - { - return -1; - } + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); // Set the <rd_ptr> of the <incoming>.. incoming.rd_ptr (copy_len); @@ -584,19 +482,10 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, copy_len = incoming.length (); } - // paranoid check for endless-event-looping - if (copy_len == 0) - { - return -1; - } - // Copy the right amount of data in to the node... // node.. - if (qd->msg_block_->copy (incoming.rd_ptr (), - copy_len) == -1) - { - return -1; - } + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); // Set the <rd_ptr> of the <incoming>.. qd->msg_block_->rd_ptr (copy_len); @@ -606,6 +495,16 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, return 0; } +void +TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) +{ + // Get the message information + this->init_queued_data (qd, this->message_state_); + + // Reset the message_state + this->message_state_.reset (); +} + int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -650,7 +549,7 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); - size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; if (TAO_debug_level > 0) @@ -743,7 +642,7 @@ TAO_GIOP_Message_Base::process_reply_message ( // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); - size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; if (TAO_debug_level > 0) @@ -805,8 +704,8 @@ TAO_GIOP_Message_Base::process_reply_message ( // every reply on this connection. if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ") - ACE_TEXT ("dispatch reply failed\n"), + "TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, " + "dispatch reply failed\n", params.transport_->id ())); } @@ -839,8 +738,7 @@ TAO_GIOP_Message_Base::generate_exception_reply ( // happened -> no hope, close connection. // Close the handle. - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ") ACE_TEXT ("generate_exception_reply ()\n"))); return -1; @@ -856,6 +754,7 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type, TAO_OutputCDR &msg) { // Reset the message type + // Reset the message type msg.reset (); CORBA::Octet header[12] = @@ -931,11 +830,11 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, CORBA::Object_var forward_to; /* - * Hook to specialize request processing within TAO + * Hook to specialize request processing within TAO * This hook will be replaced by specialized request * processing implementation. */ -//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START +//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START // Do this before the reply is sent. this->orb_core_->request_dispatcher ()->dispatch ( @@ -949,18 +848,10 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, if (!CORBA::is_nil (forward_to.in ())) { - const CORBA::Boolean permanent_forward_condition = - this->orb_core_->is_permanent_forward_condition - (forward_to.in (), - request.request_service_context ()); - // We should forward to another object... TAO_Pluggable_Reply_Params_Base reply_params; reply_params.request_id_ = request_id; - reply_params.reply_status_ = - permanent_forward_condition - ? TAO_GIOP_LOCATION_FORWARD_PERM - : TAO_GIOP_LOCATION_FORWARD; + reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD; reply_params.svc_ctx_.length (0); // Send back the reply service context. @@ -1432,8 +1323,8 @@ TAO_GIOP_Message_Base:: { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -") - ACE_TEXT (" connection already closed\n"))); + "TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -" + " connection already closed\n")); return; } #endif @@ -1454,14 +1345,14 @@ TAO_GIOP_Message_Base:: { if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"), - transport->id (), errno)); + "(%P|%t) error closing connection %u, errno = %d\n", + transport->id (), errno)); } transport->close_connection (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"), - transport-> id ())); + "(%P|%t) shut down transport, handle %d\n", + transport-> id ())); } @@ -1574,12 +1465,12 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, "TAO (%P|%t) - GIOP_Message_Base::dump_msg, " "%s GIOP v%c.%c msg, %d data bytes, %s endian, " "Type %s[%u]\n", - ACE_TEXT_CHAR_TO_TCHAR (label), + ACE_TEXT_TO_TCHAR_IN (label), digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]], digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]], len - TAO_GIOP_MESSAGE_HEADER_LEN , (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"), - ACE_TEXT_CHAR_TO_TCHAR(message_name), + ACE_TEXT_TO_TCHAR_IN(message_name), *id)); if (TAO_debug_level >= 10) @@ -1629,17 +1520,6 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz) TAO_Queued_Data::make_queued_data ( this->orb_core_->transport_message_buffer_allocator ()); - if (qd == 0) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") - ACE_TEXT ("our of memory, failed to allocate queued data object\n"))); - } - return 0; // NULL pointer - } - // @@todo: We have a similar method in Transport.cpp. Need to see how // we can factor them out.. // Make a datablock for the size requested + something. The @@ -1651,58 +1531,20 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz) this->orb_core_->create_input_cdr_data_block (sz + ACE_CDR::MAX_ALIGNMENT); - if (db == 0) - { - TAO_Queued_Data::release (qd); - - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") - ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"), - sz)); - } - return 0; // NULL pointer - } - ACE_Allocator *alloc = this->orb_core_->input_cdr_msgblock_allocator (); - if (alloc == 0) - { - if (TAO_debug_level >= 8) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,") - ACE_TEXT (" no ACE_Allocator defined\n"))); - } - } - - ACE_Message_Block mb (db, 0, alloc); ACE_Message_Block *new_mb = mb.duplicate (); - if (new_mb == 0) - { - TAO_Queued_Data::release (qd); - db->release(); - - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") - ACE_TEXT ("out of memory, failed to allocate message block\n"))); - } - return 0; - } - ACE_CDR::mb_align (new_mb); qd->msg_block_ = new_mb; + return qd; } @@ -1728,343 +1570,13 @@ TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major, void TAO_GIOP_Message_Base::init_queued_data ( - TAO_Queued_Data* qd, - const TAO_GIOP_Message_State& state) const + TAO_Queued_Data* qd, + const TAO_GIOP_Message_State& state) const { qd->byte_order_ = state.byte_order_; qd->major_version_ = state.giop_version_.major; qd->minor_version_ = state.giop_version_.minor; qd->more_fragments_ = state.more_fragments_; + qd->request_id_ = state.request_id_; qd->msg_type_ = this->message_type (state); } - -int -TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd, CORBA::ULong &request_id) const -{ - // Get a parser for us - TAO_GIOP_Message_Generator_Parser *generator_parser = 0; - - // Get the state information that we need to use - this->set_state (qd->major_version_, - qd->minor_version_, - generator_parser); - - // Get the read and write positions before we steal data. - size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); - size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); - rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; - - // Create a input CDR stream. We do the following - // 1 - If the incoming message block has a data block with a flag - // DONT_DELETE (for the data block) we create an input CDR - // stream the same way. - // 2 - If the incoming message block had a datablock from heap just - // use it by duplicating it and make the flag 0. - // NOTE: We use the same data block in which we read the message and - // we pass it on to the higher layers of the ORB. So we dont to any - // copies at all here. The same is also done in the higher layers. - - ACE_Message_Block::Message_Flags flg = 0; - ACE_Data_Block *db = 0; - - // Get the flag in the message block - flg = qd->msg_block_->self_flags (); - - if (ACE_BIT_ENABLED (flg, - ACE_Message_Block::DONT_DELETE)) - { - // Use the same datablock - db = qd->msg_block_->data_block (); - } - else - { - // Use a duplicated datablock as the datablock has come off the - // heap. - db = qd->msg_block_->data_block ()->duplicate (); - } - - - TAO_InputCDR input_cdr (db, - flg, - rd_pos, - wr_pos, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_); - - if (qd->major_version_ >= 1 && - (qd->minor_version_ == 0 || qd->minor_version_ == 1)) - { - if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY) - { - IOP::ServiceContextList service_context; - - if ( ! (input_cdr >> service_context && - input_cdr >> request_id) ) - { - return -1; - } - - return 0; - } - else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) - { - if ( ! (input_cdr >> request_id) ) - { - return -1; - } - - return 0; - } - else - { - return -1; - } - } - else - { - if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || - qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) - { - // Dealing with GIOP-1.2, the request-id is located directly behind the GIOP-Header. - // This is true for all message types that might be sent in form of fragments or cancel-requests. - if ( ! (input_cdr >> request_id) ) - { - return -1; - } - - return 0; - } - else - { - return -1; - } - } - - return -1; -} - -/* @return -1 error, 0 ok, +1 outstanding fragments */ -int -TAO_GIOP_Message_Base::consolidate_fragmented_message (TAO_Queued_Data *qd, TAO_Queued_Data *&msg) -{ - TAO::Incoming_Message_Stack reverse_stack; - - TAO_Queued_Data *tail = 0; - TAO_Queued_Data *head = 0; - - // - // CONSOLIDATE FRAGMENTED MESSAGE - // - - // check for error-condition - if (qd == 0) - { - return -1; - } - - if (qd->major_version_ == 1 && qd->minor_version_ == 0) - { - TAO_Queued_Data::release (qd); - return -1; // error: GIOP-1.0 does not support fragments - } - - // If this is not the last fragment, push it onto stack for later processing - if (qd->more_fragments_) - { - this->fragment_stack_.push (qd); - - msg = 0; // no consolidated message available yet - return 1; // status: more messages expected. - } - - tail = qd; // init - - // Add the current message block to the end of the chain - // after adjusting the read pointer to skip the header(s) - const size_t header_adjustment = - this->header_length () + - this->fragment_header_length (tail->major_version_, - tail->minor_version_); - - if (tail->msg_block_->length () < header_adjustment) - { - // buffer length not sufficient - TAO_Queued_Data::release (qd); - return -1; - } - - // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2 - if (tail->major_version_ == 1 && tail->minor_version_ == 1) - { - // GIOP-1.1 - - while (this->fragment_stack_.pop (head) != -1) - { - if (head->more_fragments_ && - head->major_version_ == 1 && - head->minor_version_ == 1 && - head->msg_block_->length () >= header_adjustment) - { - // adjust the read-pointer, skip the fragment header - tail->msg_block_->rd_ptr(header_adjustment); - - head->msg_block_->cont (tail->msg_block_); - - tail->msg_block_ = 0; - - TAO_Queued_Data::release (tail); - - tail = head; - } - else - { - reverse_stack.push (head); - } - } - } - else - { - // > GIOP-1.2 - - CORBA::ULong tmp_request_id = 0; - if (this->parse_request_id (tail, tmp_request_id) == -1) - { - return -1; - } - - const CORBA::ULong request_id = tmp_request_id; - - while (this->fragment_stack_.pop (head) != -1) - { - CORBA::ULong head_request_id = 0; - int parse_status = 0; - - if (head->more_fragments_ && - head->major_version_ >= 1 && - head->minor_version_ >= 2 && - head->msg_block_->length () >= header_adjustment && - (parse_status = this->parse_request_id (head, head_request_id)) != -1 && - request_id == head_request_id) - { - // adjust the read-pointer, skip the fragment header - tail->msg_block_->rd_ptr(header_adjustment); - - head->msg_block_->cont (tail->msg_block_); - - tail->msg_block_ = 0; - - TAO_Queued_Data::release (tail); - - tail = head; - } - else - { - if (parse_status == -1) - { - TAO_Queued_Data::release (head); - return -1; - } - - reverse_stack.push (head); - } - } - } - - // restore stack - while (reverse_stack.pop (head) != -1) - { - this->fragment_stack_.push (head); - } - - if (tail->consolidate () == -1) - { - // memory allocation failed - TAO_Queued_Data::release (tail); - return -1; - } - - // set out value - msg = tail; - - return 0; -} - - -int -TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request) -{ - // We must extract the specific request-id from message-buffer - // and remove all fragments from stack that match this request-id. - - TAO::Incoming_Message_Stack reverse_stack; - - CORBA::ULong cancel_request_id; - - if (this->parse_request_id (cancel_request, cancel_request_id) == -1) - { - return -1; - } - - TAO_Queued_Data *head = 0; - - // Revert stack - while (this->fragment_stack_.pop (head) != -1) - { - reverse_stack.push (head); - } - - bool discard_all_GIOP11_messages = false; - - // Now we are able to process message in order they have arrived. - // If the cancel_request_id matches to GIOP-1.1 message, all succeeding - // fragments belong to this message and must be discarded. - // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the - // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments - // having encoded the request id will be discarded. - while (reverse_stack.pop (head) != -1) - { - CORBA::ULong head_request_id; - - if (head->major_version_ == 1 && - head->minor_version_ <= 1 && - head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id - this->parse_request_id (head, head_request_id) >= 0 && - cancel_request_id == head_request_id) - { - TAO_Queued_Data::release (head); - - discard_all_GIOP11_messages = true; - } - else if (head->major_version_ == 1 && - head->minor_version_ <= 1 && - discard_all_GIOP11_messages) - { - TAO_Queued_Data::release (head); - } - else if (head->major_version_ >= 1 && - head->minor_version_ >= 2 && - this->parse_request_id (head, head_request_id) >= 0 && - cancel_request_id == head_request_id) - { - TAO_Queued_Data::release (head); - } - else - { - this->fragment_stack_.push (head); - } - } - - return 0; -} - - -TAO_END_VERSIONED_NAMESPACE_DECL |