diff options
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 830 |
1 files changed, 700 insertions, 130 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index aa0efbb8b97..8d266a30a21 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -1,17 +1,17 @@ // $Id$ -#include "GIOP_Message_Base.h" -#include "operation_details.h" -#include "debug.h" -#include "ORB_Core.h" -#include "TAO_Server_Request.h" -#include "GIOP_Message_Locate_Header.h" -#include "Transport.h" -#include "Transport_Mux_Strategy.h" -#include "LF_Strategy.h" -#include "Request_Dispatcher.h" -#include "Codeset_Manager.h" -#include "SystemException.h" +#include "tao/GIOP_Message_Base.h" +#include "tao/operation_details.h" +#include "tao/debug.h" +#include "tao/ORB_Core.h" +#include "tao/TAO_Server_Request.h" +#include "tao/GIOP_Message_Locate_Header.h" +#include "tao/Transport.h" +#include "tao/Transport_Mux_Strategy.h" +#include "tao/LF_Strategy.h" +#include "tao/Request_Dispatcher.h" +#include "tao/Codeset_Manager.h" +#include "tao/SystemException.h" /* * Hook to add additional include files during specializations. @@ -22,22 +22,27 @@ ACE_RCSID (tao, GIOP_Message_Base, "$Id$") -TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, - size_t /*input_cdr_size*/) +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core, + TAO_Transport * transport, + size_t /* input_cdr_size */) : orb_core_ (orb_core) - , message_state_ () - , out_stream_ (this->buffer_, - sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ - TAO_ENCAP_BYTE_ORDER, - orb_core->output_cdr_buffer_allocator (), - orb_core->output_cdr_dblock_allocator (), - orb_core->output_cdr_msgblock_allocator (), - orb_core->orb_params ()->cdr_memcpy_tradeoff (), - TAO_DEF_GIOP_MAJOR, - TAO_DEF_GIOP_MINOR) + , message_state_ () + , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport)) + , out_stream_ (this->buffer_, + sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ + TAO_ENCAP_BYTE_ORDER, + orb_core->output_cdr_buffer_allocator (), + orb_core->output_cdr_dblock_allocator (), + orb_core->output_cdr_msgblock_allocator (), + orb_core->orb_params ()->cdr_memcpy_tradeoff (), + fragmentation_strategy_.get (), + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR) { #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) - ACE_OS::memset(buffer_, 0, sizeof (buffer_)); + ACE_OS::memset (this->buffer_, 0, sizeof (buffer_)); #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ } @@ -196,7 +201,7 @@ TAO_GIOP_Message_Base::generate_reply_header ( ACE_TRY { // Now call the implementation for the rest of the header - int result = + int const result = generator_parser->write_reply_header (cdr, params ACE_ENV_ARG_PARAMETER); @@ -216,7 +221,7 @@ TAO_GIOP_Message_Base::generate_reply_header ( { if (TAO_debug_level > 4) ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, - "TAO_GIOP_Message_Base::generate_reply_header"); + ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header")); return -1; } @@ -225,12 +230,39 @@ TAO_GIOP_Message_Base::generate_reply_header ( return 0; } - int -TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/, - int /*block */, - ACE_Time_Value * /*max_wait_time*/) +TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr, + CORBA::ULong request_id) { + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + CORBA::Octet major, minor; + + cdr.get_version (major, minor); + + // GIOP fragments are supported in GIOP 1.1 and better, but TAO only + // supports them in 1.2 or better since GIOP 1.1 fragments do not + // have a fragment message header. + if (major == 1 && minor < 2) + return -1; + + // Get the state information that we need to use + this->set_state (major, + minor, + generator_parser); + + // Write the GIOP header first + if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr) + || !generator_parser->write_fragment_header (cdr, request_id)) + { + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); + + return -1; + } + return 0; } @@ -238,11 +270,12 @@ int TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) { // Ptr to first buffer. - char *buf = (char *) stream.buffer (); + char * buf = (char *) stream.buffer (); + + this->set_giop_flags (stream); // Length of all buffers. - size_t total_len = - stream.total_length (); + size_t const total_len = stream.total_length (); // NOTE: Here would also be a fine place to calculate a digital // signature for the message and place it into a preallocated slot @@ -251,7 +284,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) // this particular environment and that isn't handled by the // networking infrastructure (e.g., IPSEC). - CORBA::ULong bodylen = static_cast <CORBA::ULong> + CORBA::ULong const bodylen = static_cast <CORBA::ULong> (total_len - TAO_GIOP_MESSAGE_HEADER_LEN); #if !defined (ACE_ENABLE_SWAP_ON_WRITE) @@ -307,6 +340,7 @@ TAO_GIOP_Message_Base::message_type ( case TAO_GIOP_LOCATEREPLY: return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY; + case TAO_GIOP_REPLY: return TAO_PLUGGABLE_MESSAGE_REPLY; @@ -317,13 +351,19 @@ TAO_GIOP_Message_Base::message_type ( return TAO_PLUGGABLE_MESSAGE_FRAGMENT; case TAO_GIOP_MESSAGERROR: + return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; + case TAO_GIOP_CANCELREQUEST: - // Does it happen? why?? + return TAO_PLUGGABLE_MESSAGE_CANCELREQUEST; + default: + if (TAO_debug_level > 0) + { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ") ACE_TEXT ("wrong message.\n"))); } + } return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } @@ -331,31 +371,50 @@ TAO_GIOP_Message_Base::message_type ( int TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) { + this->message_state_.reset (); + return this->message_state_.parse_message_header (incoming); } -ssize_t -TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) +int +TAO_GIOP_Message_Base::parse_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data &qd, + size_t &mesg_length) { - // Actual message size including the header.. - CORBA::ULong msg_size = - this->message_state_.message_size (); - - size_t len = incoming.length (); - - // If we have too many messages or if we have less than even a size - // of the GIOP header then .. - if (len > msg_size || - len < TAO_GIOP_MESSAGE_HEADER_LEN) + if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) { - return -1; + qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED; + + return 0; /* incomplete header */ } - else if (len == msg_size) - return 0; + else + { + TAO_GIOP_Message_State state; - return msg_size - len; -} + if (state.parse_message_header (incoming) == -1) + { + return -1; + } + + const size_t message_size = state.message_size (); /* Header + Payload */ + + if (message_size > incoming.length ()) + { + qd.missing_data_ = message_size - incoming.length (); + } + else + { + qd.missing_data_ = 0; + } + + /* init out-parameters */ + this->init_queued_data (&qd, state); + mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN + + state.payload_size (); + return 1; /* complete header */ + } +} int TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, @@ -365,15 +424,45 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, { if (incoming.length () > 0) { - // Make a node which has a message block of the size of - // MESSAGE_HEADER_LEN. - qd = - this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); + // Optimize memory usage, we dont know actual message size + // so far, but allocate enough space to hold small GIOP + // messages. This way we avoid expensive "grow" operation + // for small messages. + const size_t default_buf_size = ACE_CDR::DEFAULT_BUFSIZE; + + // Make a node which has at least message block of the size + // of MESSAGE_HEADER_LEN. + const size_t buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN, + default_buf_size); + + // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN + + qd = this->make_queued_data (buf_size); + + if (qd == 0) + { + if (TAO_debug_level > 0) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") + ACE_TEXT ("out of memory\n"))); + } + return -1; + } qd->msg_block_->copy (incoming.rd_ptr (), incoming.length ()); - qd->missing_data_ = -1; + + incoming.rd_ptr (incoming.length ()); // consume all available data + + qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED; + } + else + { + // handle not initialized variables + qd = 0; // reset } + return 0; } @@ -387,12 +476,26 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, qd = this->make_queued_data (copying_len); + if (qd == 0) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") + ACE_TEXT ("out of memory\n"))); + } + return -1; + } + if (copying_len > incoming.length ()) { qd->missing_data_ = copying_len - incoming.length (); - copying_len = incoming.length (); } + else + { + qd->missing_data_ = 0; + } qd->msg_block_->copy (incoming.rd_ptr (), copying_len); @@ -408,35 +511,71 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, ACE_Message_Block &incoming) { // Look to see whether we had atleast parsed the GIOP header ... - if (qd->missing_data_ == -1) + if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED) { // The data length that has been stuck in there during the last // read .... - size_t len = + size_t const len = qd->msg_block_->length (); + // paranoid check + if (len >= TAO_GIOP_MESSAGE_HEADER_LEN) + { + // inconsistency - this code should have parsed the header + // so far + return -1; + } + // We know that we would have space for // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data // from the <incoming> into the message block in <qd> - qd->msg_block_->copy (incoming.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN - len); + const size_t available = incoming.length (); + const size_t desired = TAO_GIOP_MESSAGE_HEADER_LEN - len; + const size_t n_copy = ace_min (available, desired); + + // paranoid check, but would cause endless looping + if (n_copy == 0) + { + return -1; + } + + if (qd->msg_block_->copy (incoming.rd_ptr (), + n_copy) == -1) + { + return -1; + } // Move the rd_ptr () in the incoming message block.. - incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); + incoming.rd_ptr (n_copy); + + // verify sufficient data to parse GIOP header + if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + return 0; /* continue */ + } TAO_GIOP_Message_State state; // Parse the message header now... if (state.parse_message_header (*qd->msg_block_) == -1) - return -1; - - // Now grow the message block so that we can copy the rest of - // the data... - if (qd->msg_block_->space () < state.message_size ()) { - ACE_CDR::grow (qd->msg_block_, - state.message_size ()); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ") + ACE_TEXT ("error parsing header\n") )); + } + return -1; } + // Now grow the message block so that we can copy the rest of + // the data, the message_block must be able to hold complete message + if (ACE_CDR::grow (qd->msg_block_, + state.message_size ()) == -1) /* GIOP_Header + Payload */ + { + // on mem-error get rid of context silently, try to avoid + // system calls that might allocate additional memory + return -1; + } // Copy the pay load.. // Calculate the bytes that needs to be copied in the queue... @@ -459,8 +598,11 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, // ..now we are set to copy the right amount of data to the // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); + if (qd->msg_block_->copy (incoming.rd_ptr (), + copy_len) == -1) + { + return -1; + } // Set the <rd_ptr> of the <incoming>.. incoming.rd_ptr (copy_len); @@ -482,10 +624,19 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, copy_len = incoming.length (); } + // paranoid check for endless-event-looping + if (copy_len == 0) + { + return -1; + } + // Copy the right amount of data in to the node... // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); + if (qd->msg_block_->copy (incoming.rd_ptr (), + copy_len) == -1) + { + return -1; + } // Set the <rd_ptr> of the <incoming>.. qd->msg_block_->rd_ptr (copy_len); @@ -495,16 +646,6 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, return 0; } -void -TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) -{ - // Get the message information - this->init_queued_data (qd, this->message_state_); - - // Reset the message_state - this->message_state_.reset (); -} - int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -544,12 +685,13 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, this->orb_core_->input_cdr_dblock_allocator (), this->orb_core_->input_cdr_msgblock_allocator (), this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), + this->fragmentation_strategy_.get (), qd->major_version_, qd->minor_version_); // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); - size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; if (TAO_debug_level > 0) @@ -642,7 +784,7 @@ TAO_GIOP_Message_Base::process_reply_message ( // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); - size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; if (TAO_debug_level > 0) @@ -704,8 +846,8 @@ TAO_GIOP_Message_Base::process_reply_message ( // every reply on this connection. if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, " - "dispatch reply failed\n", + ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ") + ACE_TEXT ("dispatch reply failed\n"), params.transport_->id ())); } @@ -738,7 +880,8 @@ TAO_GIOP_Message_Base::generate_exception_reply ( // happened -> no hope, close connection. // Close the handle. - ACE_DEBUG ((LM_DEBUG, + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ") ACE_TEXT ("generate_exception_reply ()\n"))); return -1; @@ -754,34 +897,36 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type, TAO_OutputCDR &msg) { // Reset the message type - // Reset the message type msg.reset (); CORBA::Octet header[12] = - { - // The following works on non-ASCII platforms, such as MVS (which - // uses EBCDIC). - 0x47, // 'G' - 0x49, // 'I' - 0x4f, // 'O' - 0x50 // 'P' - }; + { + // The following works on non-ASCII platforms, such as MVS (which + // uses EBCDIC). + 0x47, // 'G' + 0x49, // 'I' + 0x4f, // 'O' + 0x50 // 'P' + }; CORBA::Octet major, minor = 0; - msg.get_version (major, minor); + + (void) msg.get_version (major, minor); header[4] = major; header[5] = minor; - // We are putting the byte order. But at a later date if we support - // fragmentation and when we want to use the other 6 bits in this - // octet we can have a virtual function do this for us as the - // version info , Bala - header[6] = (TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ()); + // "flags" octet, i.e. header[6] will be set up later when message + // is formatted by the transport. + + header[7] = CORBA::Octet (type); // Message type - header[7] = CORBA::Octet(type); + static ACE_CDR::ULong const header_size = + sizeof (header) / sizeof (header[0]); - static int header_size = sizeof (header) / sizeof (header[0]); + // Fragmentation should not occur at this point since there are only + // 12 bytes in the stream, and fragmentation may only occur when + // the stream length >= 16. msg.write_octet_array (header, header_size); return msg.good_bit (); @@ -830,11 +975,11 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, CORBA::Object_var forward_to; /* - * Hook to specialize request processing within TAO + * Hook to specialize request processing within TAO * This hook will be replaced by specialized request * processing implementation. */ -//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START +//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START // Do this before the reply is sent. this->orb_core_->request_dispatcher ()->dispatch ( @@ -848,15 +993,28 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, if (!CORBA::is_nil (forward_to.in ())) { + const CORBA::Boolean permanent_forward_condition = + this->orb_core_->is_permanent_forward_condition + (forward_to.in (), + request.request_service_context ()); + // We should forward to another object... TAO_Pluggable_Reply_Params_Base reply_params; reply_params.request_id_ = request_id; - reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD; + reply_params.reply_status_ = + permanent_forward_condition + ? TAO_GIOP_LOCATION_FORWARD_PERM + : TAO_GIOP_LOCATION_FORWARD; reply_params.svc_ctx_.length (0); // Send back the reply service context. reply_params.service_context_notowned (&request.reply_service_info ()); + output.message_attributes (request_id, + 0, + TAO_Transport::TAO_REPLY, + 0); + // Make the GIOP header and Reply header this->generate_reply_header (output, reply_params); @@ -871,6 +1029,8 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, return -1; } + output.more_fragments (false); + int result = transport->send_message (output, 0, TAO_Transport::TAO_REPLY); @@ -1150,6 +1310,8 @@ TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport, request.request_id (), status_info); + output.more_fragments (false); + // Send the message int result = transport->send_message (output, 0, @@ -1323,8 +1485,8 @@ TAO_GIOP_Message_Base:: { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -" - " connection already closed\n")); + ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -") + ACE_TEXT (" connection already closed\n"))); return; } #endif @@ -1345,14 +1507,14 @@ TAO_GIOP_Message_Base:: { if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - "(%P|%t) error closing connection %u, errno = %d\n", - transport->id (), errno)); + ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"), + transport->id (), errno)); } transport->close_connection (); ACE_DEBUG ((LM_DEBUG, - "(%P|%t) shut down transport, handle %d\n", - transport-> id ())); + ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"), + transport-> id ())); } @@ -1389,6 +1551,8 @@ TAO_GIOP_Message_Base::send_reply_exception ( *x) == -1) return -1; + output.more_fragments (false); + return transport->send_message (output, 0, TAO_Transport::TAO_REPLY); @@ -1434,7 +1598,8 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, char *tmp_id = 0; if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST || - ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY) + ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY || + ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT) { if (major == 1 && minor < 2) { @@ -1446,16 +1611,16 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN); } #if !defined (ACE_DISABLE_SWAP_ON_READ) - if (byte_order == TAO_ENCAP_BYTE_ORDER) - { - id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); - } - else - { - ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id)); - } + if (byte_order == TAO_ENCAP_BYTE_ORDER) + { + id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); + } + else + { + ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id)); + } #else - id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); + id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); #endif /* ACE_DISABLE_SWAP_ON_READ */ } @@ -1520,6 +1685,17 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz) TAO_Queued_Data::make_queued_data ( this->orb_core_->transport_message_buffer_allocator ()); + if (qd == 0) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") + ACE_TEXT ("our of memory, failed to allocate queued data object\n"))); + } + return 0; // NULL pointer + } + // @@todo: We have a similar method in Transport.cpp. Need to see how // we can factor them out.. // Make a datablock for the size requested + something. The @@ -1531,20 +1707,58 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz) this->orb_core_->create_input_cdr_data_block (sz + ACE_CDR::MAX_ALIGNMENT); + if (db == 0) + { + TAO_Queued_Data::release (qd); + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") + ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"), + sz)); + } + return 0; // NULL pointer + } + ACE_Allocator *alloc = this->orb_core_->input_cdr_msgblock_allocator (); + if (alloc == 0) + { + if (TAO_debug_level >= 8) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,") + ACE_TEXT (" no ACE_Allocator defined\n"))); + } + } + + ACE_Message_Block mb (db, 0, alloc); ACE_Message_Block *new_mb = mb.duplicate (); + if (new_mb == 0) + { + TAO_Queued_Data::release (qd); + db->release(); + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") + ACE_TEXT ("out of memory, failed to allocate message block\n"))); + } + return 0; + } + ACE_CDR::mb_align (new_mb); qd->msg_block_ = new_mb; - return qd; } @@ -1570,13 +1784,369 @@ TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major, void TAO_GIOP_Message_Base::init_queued_data ( - TAO_Queued_Data* qd, - const TAO_GIOP_Message_State& state) const + TAO_Queued_Data* qd, + const TAO_GIOP_Message_State& state) const { qd->byte_order_ = state.byte_order_; qd->major_version_ = state.giop_version_.major; qd->minor_version_ = state.giop_version_.minor; qd->more_fragments_ = state.more_fragments_; - qd->request_id_ = state.request_id_; qd->msg_type_ = this->message_type (state); } + +int +TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd, CORBA::ULong &request_id) const +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + // Get the state information that we need to use + this->set_state (qd->major_version_, + qd->minor_version_, + generator_parser); + + // Get the read and write positions before we steal data. + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; + + // Create a input CDR stream. We do the following + // 1 - If the incoming message block has a data block with a flag + // DONT_DELETE (for the data block) we create an input CDR + // stream the same way. + // 2 - If the incoming message block had a datablock from heap just + // use it by duplicating it and make the flag 0. + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. The same is also done in the higher layers. + + ACE_Message_Block::Message_Flags flg = 0; + ACE_Data_Block *db = 0; + + // Get the flag in the message block + flg = qd->msg_block_->self_flags (); + + if (ACE_BIT_ENABLED (flg, + ACE_Message_Block::DONT_DELETE)) + { + // Use the same datablock + db = qd->msg_block_->data_block (); + } + else + { + // Use a duplicated datablock as the datablock has come off the + // heap. + db = qd->msg_block_->data_block ()->duplicate (); + } + + + TAO_InputCDR input_cdr (db, + flg, + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); + + if (qd->major_version_ >= 1 && + (qd->minor_version_ == 0 || qd->minor_version_ == 1)) + { + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY) + { + IOP::ServiceContextList service_context; + + if ( ! (input_cdr >> service_context && + input_cdr >> request_id) ) + { + return -1; + } + + return 0; + } + else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) + { + if ( ! (input_cdr >> request_id) ) + { + return -1; + } + + return 0; + } + else + { + return -1; + } + } + else + { + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) + { + // Dealing with GIOP-1.2, the request-id is located directly behind the GIOP-Header. + // This is true for all message types that might be sent in form of fragments or cancel-requests. + if ( ! (input_cdr >> request_id) ) + { + return -1; + } + + return 0; + } + else + { + return -1; + } + } + + return -1; +} + +/* @return -1 error, 0 ok, +1 outstanding fragments */ +int +TAO_GIOP_Message_Base::consolidate_fragmented_message (TAO_Queued_Data *qd, TAO_Queued_Data *&msg) +{ + TAO::Incoming_Message_Stack reverse_stack; + + TAO_Queued_Data *tail = 0; + TAO_Queued_Data *head = 0; + + // + // CONSOLIDATE FRAGMENTED MESSAGE + // + + // check for error-condition + if (qd == 0) + { + return -1; + } + + if (qd->major_version_ == 1 && qd->minor_version_ == 0) + { + TAO_Queued_Data::release (qd); + return -1; // error: GIOP-1.0 does not support fragments + } + + // If this is not the last fragment, push it onto stack for later processing + if (qd->more_fragments_) + { + this->fragment_stack_.push (qd); + + msg = 0; // no consolidated message available yet + return 1; // status: more messages expected. + } + + tail = qd; // init + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the header(s) + const size_t header_adjustment = + this->header_length () + + this->fragment_header_length (tail->major_version_, + tail->minor_version_); + + if (tail->msg_block_->length () < header_adjustment) + { + // buffer length not sufficient + TAO_Queued_Data::release (qd); + return -1; + } + + // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2 + if (tail->major_version_ == 1 && tail->minor_version_ == 1) + { + // GIOP-1.1 + + while (this->fragment_stack_.pop (head) != -1) + { + if (head->more_fragments_ && + head->major_version_ == 1 && + head->minor_version_ == 1 && + head->msg_block_->length () >= header_adjustment) + { + // adjust the read-pointer, skip the fragment header + tail->msg_block_->rd_ptr(header_adjustment); + + head->msg_block_->cont (tail->msg_block_); + + tail->msg_block_ = 0; + + TAO_Queued_Data::release (tail); + + tail = head; + } + else + { + reverse_stack.push (head); + } + } + } + else + { + // > GIOP-1.2 + + CORBA::ULong tmp_request_id = 0; + if (this->parse_request_id (tail, tmp_request_id) == -1) + { + return -1; + } + + const CORBA::ULong request_id = tmp_request_id; + + while (this->fragment_stack_.pop (head) != -1) + { + CORBA::ULong head_request_id = 0; + int parse_status = 0; + + if (head->more_fragments_ && + head->major_version_ >= 1 && + head->minor_version_ >= 2 && + head->msg_block_->length () >= header_adjustment && + (parse_status = this->parse_request_id (head, head_request_id)) != -1 && + request_id == head_request_id) + { + // adjust the read-pointer, skip the fragment header + tail->msg_block_->rd_ptr(header_adjustment); + + head->msg_block_->cont (tail->msg_block_); + + tail->msg_block_ = 0; + + TAO_Queued_Data::release (tail); + + tail = head; + } + else + { + if (parse_status == -1) + { + TAO_Queued_Data::release (head); + return -1; + } + + reverse_stack.push (head); + } + } + } + + // restore stack + while (reverse_stack.pop (head) != -1) + { + this->fragment_stack_.push (head); + } + + if (tail->consolidate () == -1) + { + // memory allocation failed + TAO_Queued_Data::release (tail); + return -1; + } + + // set out value + msg = tail; + + return 0; +} + + +int +TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request) +{ + // We must extract the specific request-id from message-buffer + // and remove all fragments from stack that match this request-id. + + TAO::Incoming_Message_Stack reverse_stack; + + CORBA::ULong cancel_request_id; + + if (this->parse_request_id (cancel_request, cancel_request_id) == -1) + { + return -1; + } + + TAO_Queued_Data *head = 0; + + // Revert stack + while (this->fragment_stack_.pop (head) != -1) + { + reverse_stack.push (head); + } + + bool discard_all_GIOP11_messages = false; + + // Now we are able to process message in order they have arrived. + // If the cancel_request_id matches to GIOP-1.1 message, all succeeding + // fragments belong to this message and must be discarded. + // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the + // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments + // having encoded the request id will be discarded. + while (reverse_stack.pop (head) != -1) + { + CORBA::ULong head_request_id; + + if (head->major_version_ == 1 && + head->minor_version_ <= 1 && + head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id + this->parse_request_id (head, head_request_id) >= 0 && + cancel_request_id == head_request_id) + { + TAO_Queued_Data::release (head); + discard_all_GIOP11_messages = true; + } + else if (head->major_version_ == 1 && + head->minor_version_ <= 1 && + discard_all_GIOP11_messages) + { + TAO_Queued_Data::release (head); + } + else if (head->major_version_ >= 1 && + head->minor_version_ >= 2 && + this->parse_request_id (head, head_request_id) >= 0 && + cancel_request_id == head_request_id) + { + TAO_Queued_Data::release (head); + } + else + { + this->fragment_stack_.push (head); + } + } + + return 0; +} + +TAO_GIOP_Fragmentation_Strategy * +TAO_GIOP_Message_Base::fragmentation_strategy (void) +{ + return this->fragmentation_strategy_.get (); +} + +void +TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const +{ + CORBA::Octet * const buf = + reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ())); + + CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + + // Flags for the GIOP protocol header "flags" field. + CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; + + // Least significant bit: Byte order + ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ()); + + // Second least significant bit: More fragments + // + // Only supported in GIOP 1.1 or better. + if (!(major <= 1 && minor == 0)) + ACE_SET_BITS (flags, msg.more_fragments () << 1); +} + +TAO_END_VERSIONED_NAMESPACE_DECL |