diff options
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 2099 |
1 files changed, 2099 insertions, 0 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp new file mode 100644 index 00000000000..5262caa29f6 --- /dev/null +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -0,0 +1,2099 @@ +// $Id$ + +#include "tao/GIOP_Message_Base.h" +#include "tao/operation_details.h" +#include "tao/debug.h" +#include "tao/ORB_Core.h" +#include "tao/TAO_Server_Request.h" +#include "tao/GIOP_Message_Locate_Header.h" +#include "tao/Transport.h" +#include "tao/Transport_Mux_Strategy.h" +#include "tao/LF_Strategy.h" +#include "tao/Request_Dispatcher.h" +#include "tao/Codeset_Manager.h" +#include "tao/SystemException.h" +#include "ace/Min_Max.h" + +/* + * Hook to add additional include files during specializations. + */ +//@@ GIOP_MESSAGE_BASE_INCLUDE_ADD_HOOK + +ACE_RCSID (tao, + GIOP_Message_Base, + "$Id$") + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core, + TAO_Transport * transport, + size_t input_cdr_size) + : orb_core_ (orb_core) + , message_state_ () + , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport)) + , out_stream_ (0, + input_cdr_size, + TAO_ENCAP_BYTE_ORDER, + orb_core->output_cdr_buffer_allocator (), + orb_core->output_cdr_dblock_allocator (), + orb_core->output_cdr_msgblock_allocator (), + orb_core->orb_params ()->cdr_memcpy_tradeoff (), + fragmentation_strategy_.get (), + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR) +{ +} + + +TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void) +{ +} + + +void +TAO_GIOP_Message_Base::init (CORBA::Octet major, + CORBA::Octet minor) +{ + // Set the giop version of the out stream + this->out_stream_.set_version (major, + minor); +} + +TAO_OutputCDR & +TAO_GIOP_Message_Base::out_stream (void) +{ + return this->out_stream_; +} + +void +TAO_GIOP_Message_Base::reset (void) +{ + // no-op +} + +int +TAO_GIOP_Message_Base::generate_request_header ( + TAO_Operation_Details &op, + TAO_Target_Specification &spec, + TAO_OutputCDR &cdr + ) +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + CORBA::Octet major, minor; + + cdr.get_version (major, minor); + + // Get the state information that we need to use + this->set_state (major, + minor, + generator_parser); + + // Write the GIOP header first + if (!this->write_protocol_header (TAO_GIOP_REQUEST, + cdr)) + { + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); + } + + return -1; + } + + // Now call the implementation for the rest of the header + if (!generator_parser->write_request_header (op, + spec, + cdr)) + { + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing request header \n"))); + + return -1; + } + + return 0; +} + +int +TAO_GIOP_Message_Base::generate_locate_request_header ( + TAO_Operation_Details &op, + TAO_Target_Specification &spec, + TAO_OutputCDR &cdr + ) +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + CORBA::Octet major, minor; + + cdr.get_version (major, minor); + + // Get the state information that we need to use + this->set_state (major, + minor, + generator_parser); + + // Write the GIOP header first + if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST, + cdr)) + { + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); + + return -1; + } + + // Now call the implementation for the rest of the header + if (!generator_parser->write_locate_request_header + (op.request_id (), + spec, + cdr)) + { + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing locate request header \n"))); + + + return -1; + + } + + return 0; +} + +int +TAO_GIOP_Message_Base::generate_reply_header ( + TAO_OutputCDR &cdr, + TAO_Pluggable_Reply_Params_Base ¶ms + ) +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + CORBA::Octet major, minor; + + cdr.get_version (major, minor); + + // Get the state information that we need to use + this->set_state (major, + minor, + generator_parser); + + // Write the GIOP header first + if (!this->write_protocol_header (TAO_GIOP_REPLY, + cdr)) + { + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); + + return -1; + } + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // Now call the implementation for the rest of the header + int const result = + generator_parser->write_reply_header (cdr, + params + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (!result) + { + if (TAO_debug_level > 4) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing reply ") + ACE_TEXT ("header\n"))); + + return -1; + } + } + ACE_CATCHANY + { + if (TAO_debug_level > 4) + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header")); + + return -1; + } + ACE_ENDTRY; + + return 0; +} + +int +TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr, + CORBA::ULong request_id) +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + CORBA::Octet major, minor; + + cdr.get_version (major, minor); + + // GIOP fragments are supported in GIOP 1.1 and better, but TAO only + // supports them in 1.2 or better since GIOP 1.1 fragments do not + // have a fragment message header. + if (major == 1 && minor < 2) + return -1; + + // Get the state information that we need to use + this->set_state (major, + minor, + generator_parser); + + // Write the GIOP header first + if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr) + || !generator_parser->write_fragment_header (cdr, request_id)) + { + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); + + return -1; + } + + return 0; +} + +int +TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) +{ + // Ptr to first buffer. + char * buf = (char *) stream.buffer (); + + this->set_giop_flags (stream); + + // Length of all buffers. + size_t const total_len = stream.total_length (); + + // NOTE: Here would also be a fine place to calculate a digital + // signature for the message and place it into a preallocated slot + // in the "ServiceContext". Similarly, this is a good spot to + // encrypt messages (or just the message bodies) if that's needed in + // this particular environment and that isn't handled by the + // networking infrastructure (e.g., IPSEC). + + CORBA::ULong const bodylen = static_cast <CORBA::ULong> + (total_len - TAO_GIOP_MESSAGE_HEADER_LEN); + +#if !defined (ACE_ENABLE_SWAP_ON_WRITE) + *(reinterpret_cast <CORBA::ULong *> (buf + + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen; +#else + if (!stream.do_byte_swap ()) + *(reinterpret_cast <CORBA::ULong *> + (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen; + else + ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen), + buf + TAO_GIOP_MESSAGE_SIZE_OFFSET); +#endif /* ACE_ENABLE_SWAP_ON_WRITE */ + + if (TAO_debug_level > 2) + { + // Check whether the output cdr stream is build up of multiple + // messageblocks. If so, consolidate them to one block that can be + // dumped + ACE_Message_Block* consolidated_block = 0; + if (stream.begin()->cont () != 0) + { + ACE_NEW_RETURN (consolidated_block, + ACE_Message_Block, + 0); + ACE_CDR::consolidate (consolidated_block, stream.begin ()); + buf = (char *) (consolidated_block->rd_ptr ()); + } + /// + this->dump_msg ("send", + reinterpret_cast <u_char *> (buf), + total_len); + + // + delete consolidated_block; + consolidated_block = 0; + // + } + + return 0; +} + +TAO_Pluggable_Message_Type +TAO_GIOP_Message_Base::message_type ( + const TAO_GIOP_Message_State &msg_state) const +{ + // Convert to the right type of Pluggable Messaging message type. + + switch (msg_state.message_type_) + { + case TAO_GIOP_REQUEST: + return TAO_PLUGGABLE_MESSAGE_REQUEST; + case TAO_GIOP_LOCATEREQUEST: + return TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST; + + case TAO_GIOP_LOCATEREPLY: + return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY; + + case TAO_GIOP_REPLY: + return TAO_PLUGGABLE_MESSAGE_REPLY; + + case TAO_GIOP_CLOSECONNECTION: + return TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION; + + case TAO_GIOP_FRAGMENT: + return TAO_PLUGGABLE_MESSAGE_FRAGMENT; + + case TAO_GIOP_MESSAGERROR: + return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; + + case TAO_GIOP_CANCELREQUEST: + return TAO_PLUGGABLE_MESSAGE_CANCELREQUEST; + + default: + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ") + ACE_TEXT ("wrong message.\n"))); + } + } + + return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; +} + +int +TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) +{ + this->message_state_.reset (); + + return this->message_state_.parse_message_header (incoming); +} + +int +TAO_GIOP_Message_Base::parse_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data &qd, + size_t &mesg_length) +{ + if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED; + + return 0; /* incomplete header */ + } + else + { + TAO_GIOP_Message_State state; + + if (state.parse_message_header (incoming) == -1) + { + return -1; + } + + const size_t message_size = state.message_size (); /* Header + Payload */ + + if (message_size > incoming.length ()) + { + qd.missing_data_ = message_size - incoming.length (); + } + else + { + qd.missing_data_ = 0; + } + + /* init out-parameters */ + this->init_queued_data (&qd, state); + mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN + + state.payload_size (); + + return 1; /* complete header */ + } +} + +int +TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd) +{ + if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + if (incoming.length () > 0) + { + // Optimize memory usage, we dont know actual message size + // so far, but allocate enough space to hold small GIOP + // messages. This way we avoid expensive "grow" operation + // for small messages. + size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE; + + // Make a node which has at least message block of the size + // of MESSAGE_HEADER_LEN. + size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN, + default_buf_size); + + // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN + + qd = this->make_queued_data (buf_size); + + if (qd == 0) + { + if (TAO_debug_level > 0) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") + ACE_TEXT ("out of memory\n"))); + } + return -1; + } + + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + + incoming.rd_ptr (incoming.length ()); // consume all available data + + qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED; + } + else + { + // handle not initialized variables + qd = 0; // reset + } + + return 0; + } + + TAO_GIOP_Message_State state; + if (state.parse_message_header (incoming) == -1) + { + return -1; + } + + size_t copying_len = state.message_size (); + + qd = this->make_queued_data (copying_len); + + if (qd == 0) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ") + ACE_TEXT ("out of memory\n"))); + } + return -1; + } + + if (copying_len > incoming.length ()) + { + qd->missing_data_ = copying_len - incoming.length (); + copying_len = incoming.length (); + } + else + { + qd->missing_data_ = 0; + } + + qd->msg_block_->copy (incoming.rd_ptr (), + copying_len); + + incoming.rd_ptr (copying_len); + this->init_queued_data (qd, state); + + return 1; +} + +int +TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) +{ + // Look to see whether we had atleast parsed the GIOP header ... + if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED) + { + // The data length that has been stuck in there during the last + // read .... + size_t const len = + qd->msg_block_->length (); + + // paranoid check + if (len >= TAO_GIOP_MESSAGE_HEADER_LEN) + { + // inconsistency - this code should have parsed the header + // so far + return -1; + } + + // We know that we would have space for + // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data + // from the <incoming> into the message block in <qd> + size_t const available = incoming.length (); + size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len; + size_t const n_copy = ace_min (available, desired); + + // paranoid check, but would cause endless looping + if (n_copy == 0) + { + return -1; + } + + if (qd->msg_block_->copy (incoming.rd_ptr (), + n_copy) == -1) + { + return -1; + } + + // Move the rd_ptr () in the incoming message block.. + incoming.rd_ptr (n_copy); + + // verify sufficient data to parse GIOP header + if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + return 0; /* continue */ + } + + TAO_GIOP_Message_State state; + + // Parse the message header now... + if (state.parse_message_header (*qd->msg_block_) == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ") + ACE_TEXT ("error parsing header\n") )); + } + return -1; + } + // Now grow the message block so that we can copy the rest of + // the data, the message_block must be able to hold complete message + if (ACE_CDR::grow (qd->msg_block_, + state.message_size ()) == -1) /* GIOP_Header + Payload */ + { + // on mem-error get rid of context silently, try to avoid + // system calls that might allocate additional memory + return -1; + } + + // Copy the pay load.. + // Calculate the bytes that needs to be copied in the queue... + size_t copy_len = state.payload_size (); + + // If the data that needs to be copied is more than that is + // available to us .. + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + else + { + qd->missing_data_ = 0; + } + + // ..now we are set to copy the right amount of data to the + // node.. + if (qd->msg_block_->copy (incoming.rd_ptr (), + copy_len) == -1) + { + return -1; + } + + // Set the <rd_ptr> of the <incoming>.. + incoming.rd_ptr (copy_len); + + // Get the other details... + this->init_queued_data (qd, state); + } + else + { + // @@todo: Need to abstract this out to a seperate method... + size_t copy_len = qd->missing_data_; + + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + + // paranoid check for endless-event-looping + if (copy_len == 0) + { + return -1; + } + + // Copy the right amount of data in to the node... + // node.. + if (qd->msg_block_->copy (incoming.rd_ptr (), + copy_len) == -1) + { + return -1; + } + + // Set the <rd_ptr> of the <incoming>.. + qd->msg_block_->rd_ptr (copy_len); + + } + + return 0; +} + +int +TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, + TAO_Queued_Data *qd) +{ + // Set the upcall thread + this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ()); + + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + // Get the state information that we need to use + this->set_state (qd->major_version_, + qd->minor_version_, + generator_parser); + + // A buffer that we will use to initialise the CDR stream. Since we're + // allocating the buffer on the stack, we may as well allocate the data + // block on the stack too and avoid an allocation inside the message + // block of the CDR. +#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) + char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 }; +#else + char repbuf[ACE_CDR::DEFAULT_BUFSIZE]; +#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ + ACE_Data_Block out_db (sizeof (repbuf), + ACE_Message_Block::MB_DATA, + repbuf, + this->orb_core_->input_cdr_buffer_allocator (), + 0, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->input_cdr_dblock_allocator ()); + + // Initialize an output CDR on the stack + // NOTE: Don't jump to a conclusion as to why we are using the + // input_cdr and hence the global pool here. These pools will move + // to the lanes anyway at some point of time. Further, it would have + // been awesome to have this in TSS. But for some reason the cloning + // that happens when the ORB gets flow controlled while writing a + // reply is messing things up. We crash horribly. Doing this adds a + // lock, we need to set things like this -- put stuff in TSS here + // and transfer to global memory when we get flow controlled. We + // need to work on the message block to get it right! + TAO_OutputCDR output (&out_db, + TAO_ENCAP_BYTE_ORDER, + this->orb_core_->input_cdr_msgblock_allocator (), + this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), + this->fragmentation_strategy_.get (), + qd->major_version_, + qd->minor_version_); + + // Get the read and write positions before we steal data. + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; + + if (TAO_debug_level > 0) + this->dump_msg ("recv", + reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); + + + // Create a input CDR stream. We do the following + // 1 - If the incoming message block has a data block with a flag + // DONT_DELETE (for the data block) we create an input CDR + // stream the same way. + // 2 - If the incoming message block had a datablock from heap just + // use it by duplicating it and make the flag 0. + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. The same is also done in the higher layers. + + ACE_Message_Block::Message_Flags flg = 0; + ACE_Data_Block *db = 0; + + // Get the flag in the message block + flg = qd->msg_block_->self_flags (); + + if (ACE_BIT_ENABLED (flg, + ACE_Message_Block::DONT_DELETE)) + { + // Use the same datablock + db = qd->msg_block_->data_block (); + } + else + { + // Use a duplicated datablock as the datablock has come off the + // heap. + db = qd->msg_block_->data_block ()->duplicate (); + } + + + TAO_InputCDR input_cdr (db, + flg, + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); + + transport->assign_translators(&input_cdr,&output); + + // We know we have some request message. Check whether it is a + // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. + + // Once we send the InputCDR stream we need to just forget about + // the stream and never touch that again for anything. We basically + // loose ownership of the data_block. + + switch (qd->msg_type_) + { + case TAO_PLUGGABLE_MESSAGE_REQUEST: + // Should be taken care by the state specific invocations. They + // could raise an exception or write things in the output CDR + // stream + return this->process_request (transport, + input_cdr, + output, + generator_parser); + + case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST: + return this->process_locate_request (transport, + input_cdr, + output, + generator_parser); + default: + return -1; + } +} + +int +TAO_GIOP_Message_Base::process_reply_message ( + TAO_Pluggable_Reply_Params ¶ms, + TAO_Queued_Data *qd) +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + // Get the state information that we need to use + this->set_state (qd->major_version_, + qd->minor_version_, + generator_parser); + + // Get the read and write positions before we steal data. + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; + + if (TAO_debug_level > 0) + this->dump_msg ("recv", + reinterpret_cast <u_char *> (qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); + + + // Create a empty buffer on stack + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. + TAO_InputCDR input_cdr (qd->msg_block_->data_block (), + ACE_Message_Block::DONT_DELETE, + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); + + // We know we have some reply message. Check whether it is a + // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. + + // Once we send the InputCDR stream we need to just forget about + // the stream and never touch that again for anything. We basically + // loose ownership of the data_block. + int retval = 0; + + switch (qd->msg_type_) + { + case TAO_PLUGGABLE_MESSAGE_REPLY: + // Should be taken care by the state specific parsing + retval = + generator_parser->parse_reply (input_cdr, + params); + + break; + case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: + retval = + generator_parser->parse_locate_reply (input_cdr, + params); + break; + default: + retval = -1; + } + + if (retval == -1) + return retval; + + params.input_cdr_ = &input_cdr; + + retval = + params.transport_->tms ()->dispatch_reply (params); + + if (retval == -1) + { + // Something really critical happened, we will forget about + // every reply on this connection. + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ") + ACE_TEXT ("dispatch reply failed\n"), + params.transport_->id ())); + } + + return retval; +} + +int +TAO_GIOP_Message_Base::generate_exception_reply ( + TAO_OutputCDR &cdr, + TAO_Pluggable_Reply_Params_Base ¶ms, + CORBA::Exception &x + ) +{ + // A new try/catch block, but if something goes wrong now we have no + // hope, just abort. + ACE_DECLARE_NEW_CORBA_ENV; + + ACE_TRY + { + // Make the GIOP & reply header. + this->generate_reply_header (cdr, + params); + x._tao_encode (cdr + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::Exception, ex) + { + // Now we know that while handling the error an other error + // happened -> no hope, close connection. + + // Close the handle. + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ") + ACE_TEXT ("generate_exception_reply ()\n"))); + return -1; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (-1); + + return 0; +} + +int +TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type, + TAO_OutputCDR &msg) +{ + // Reset the message type + msg.reset (); + + CORBA::Octet header[12] = + { + // The following works on non-ASCII platforms, such as MVS (which + // uses EBCDIC). + 0x47, // 'G' + 0x49, // 'I' + 0x4f, // 'O' + 0x50 // 'P' + }; + + CORBA::Octet major, minor = 0; + + (void) msg.get_version (major, minor); + + header[4] = major; + header[5] = minor; + + // "flags" octet, i.e. header[6] will be set up later when message + // is formatted by the transport. + + header[7] = CORBA::Octet (type); // Message type + + static ACE_CDR::ULong const header_size = + sizeof (header) / sizeof (header[0]); + + // Fragmentation should not occur at this point since there are only + // 12 bytes in the stream, and fragmentation may only occur when + // the stream length >= 16. + msg.write_octet_array (header, header_size); + + return msg.good_bit (); +} + +int +TAO_GIOP_Message_Base::process_request ( + TAO_Transport * transport, + TAO_InputCDR & cdr, + TAO_OutputCDR & output, + TAO_GIOP_Message_Generator_Parser * parser) +{ + // This will extract the request header, set <response_required> + // and <sync_with_server> as appropriate. + TAO_ServerRequest request (this, + cdr, + output, + transport, + this->orb_core_); + + CORBA::ULong request_id = 0; + CORBA::Boolean response_required = false; + + int parse_error = 0; + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + parse_error = + parser->parse_request_header (request); + + // Throw an exception if the + if (parse_error != 0) + ACE_TRY_THROW (CORBA::MARSHAL (0, + CORBA::COMPLETED_NO)); + + TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager (); + if (csm) + { + csm->process_service_context (request); + transport->assign_translators (&cdr, &output); + } + + request_id = request.request_id (); + + response_required = request.response_expected (); + + CORBA::Object_var forward_to; + + /* + * Hook to specialize request processing within TAO + * This hook will be replaced by specialized request + * processing implementation. + */ +//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START + + // Do this before the reply is sent. + this->orb_core_->request_dispatcher ()->dispatch ( + this->orb_core_, + request, + forward_to + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + +//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_END + + if (!CORBA::is_nil (forward_to.in ())) + { + CORBA::Boolean const permanent_forward_condition = + this->orb_core_->is_permanent_forward_condition + (forward_to.in (), + request.request_service_context ()); + + // We should forward to another object... + TAO_Pluggable_Reply_Params_Base reply_params; + reply_params.request_id_ = request_id; + reply_params.reply_status_ = + permanent_forward_condition + ? TAO_GIOP_LOCATION_FORWARD_PERM + : TAO_GIOP_LOCATION_FORWARD; + reply_params.svc_ctx_.length (0); + + // Send back the reply service context. + reply_params.service_context_notowned ( + &request.reply_service_info ()); + + output.message_attributes (request_id, + 0, + TAO_Transport::TAO_REPLY, + 0); + + // Make the GIOP header and Reply header + this->generate_reply_header (output, reply_params); + + if (!(output << forward_to.in ())) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ") + ACE_TEXT ("forward reference.\n"))); + + return -1; + } + + output.more_fragments (false); + + int result = transport->send_message (output, + 0, + TAO_Transport::TAO_REPLY); + if (result == -1) + { + if (TAO_debug_level > 0) + { + // No exception but some kind of error, yet a + // response is required. + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") + ACE_TEXT ("cannot send reply\n"), + ACE_TEXT ("TAO_GIOP_Message_Base::process_request"))); + } + } + return result; + } + } + // Only CORBA exceptions are caught here. + ACE_CATCHANY + { + int result = 0; + + if (response_required) + { + result = this->send_reply_exception (transport, + output, + request_id, + &request.reply_service_info (), + &ACE_ANY_EXCEPTION); + if (result == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") + ACE_TEXT ("cannot send exception\n"), + ACE_TEXT ("process_connector_request ()"))); + + ACE_PRINT_EXCEPTION ( + ACE_ANY_EXCEPTION, + "TAO_GIOP_Message_Base::process_request[1]"); + } + } + + } + else if (TAO_debug_level > 0) + { + // It is unfortunate that an exception (probably a system + // exception) was thrown by the upcall code (even by the + // user) when the client was not expecting a response. + // However, in this case, we cannot close the connection + // down, since it really isn't the client's fault. + + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) exception thrown ") + ACE_TEXT ("but client is not waiting a response\n"))); + + ACE_PRINT_EXCEPTION ( + ACE_ANY_EXCEPTION, + "TAO_GIOP_Message_Base::process_request[2]"); + } + + return result; + } +#if defined (TAO_HAS_EXCEPTIONS) + ACE_CATCHALL + { + // @@ TODO some c++ exception or another, but what do we do with + // it? + // We are supposed to map it into a CORBA::UNKNOWN exception. + // BTW, this cannot be detected if using the <env> mapping. If + // we have native exceptions but no support for them in the ORB + // we should still be able to catch it. If we don't have native + // exceptions it couldn't have been raised in the first place! + int result = 0; + + if (response_required) + { + CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code + (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0), + CORBA::COMPLETED_MAYBE); + + result = this->send_reply_exception (transport, + output, + request_id, + &request.reply_service_info (), + &exception); + if (result == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ") + ACE_TEXT ("%p: ") + ACE_TEXT ("cannot send exception\n"), + ACE_TEXT ("process_request ()"))); + ACE_PRINT_EXCEPTION ( + exception, + "TAO_GIOP_Message_Base::process_request[3]"); + } + } + } + else if (TAO_debug_level > 0) + { + // It is unfotunate that an exception (probably a system + // exception) was thrown by the upcall code (even by the + // user) when the client was not expecting a response. + // However, in this case, we cannot close the connection + // down, since it really isn't the client's fault. + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t|%N|%l) exception thrown ") + ACE_TEXT ("but client is not waiting a response\n"))); + } + + return result; + } +#endif /* TAO_HAS_EXCEPTIONS */ + ACE_ENDTRY; + + return 0; +} + + +int +TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, + TAO_InputCDR &input, + TAO_OutputCDR &output, + TAO_GIOP_Message_Generator_Parser *parser) +{ + // This will extract the request header, set <response_required> as + // appropriate. + TAO_GIOP_Locate_Request_Header locate_request (input, + this->orb_core_); + + TAO_GIOP_Locate_Status_Msg status_info; + + // Defaulting. + status_info.status = TAO_GIOP_UNKNOWN_OBJECT; + + CORBA::Boolean response_required = true; + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + int parse_error = + parser->parse_locate_header (locate_request); + + if (parse_error != 0) + { + ACE_TRY_THROW (CORBA::MARSHAL (0, + CORBA::COMPLETED_NO)); + } + + TAO::ObjectKey tmp_key (locate_request.object_key ().length (), + locate_request.object_key ().length (), + locate_request.object_key ().get_buffer (), + 0); + + // Set it to an error state + parse_error = 1; + CORBA::ULong req_id = locate_request.request_id (); + + // We will send the reply. The ServerRequest class need not send + // the reply + CORBA::Boolean deferred_reply = true; + TAO_ServerRequest server_request (this, + req_id, + response_required, + deferred_reply, + tmp_key, + "_non_existent", + output, + transport, + this->orb_core_, + parse_error); + + if (parse_error != 0) + { + ACE_TRY_THROW (CORBA::MARSHAL (0, + CORBA::COMPLETED_NO)); + } + + CORBA::Object_var forward_to; + + this->orb_core_->request_dispatcher ()->dispatch ( + this->orb_core_, + server_request, + forward_to + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (!CORBA::is_nil (forward_to.in ())) + { + status_info.status = TAO_GIOP_OBJECT_FORWARD; + status_info.forward_location_var = forward_to; + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") + ACE_TEXT ("called: forwarding\n"))); + } + else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION) + { + // We got no exception, so the object is here. + status_info.status = TAO_GIOP_OBJECT_HERE; + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") + ACE_TEXT ("found\n"))); + } + else + { + status_info.forward_location_var = server_request.forward_location (); + + if (!CORBA::is_nil (status_info.forward_location_var.in ())) + { + status_info.status = TAO_GIOP_OBJECT_FORWARD; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") + ACE_TEXT ("forwarding\n"))); + } + else + { + // Normal exception, so the object is not here + status_info.status = TAO_GIOP_UNKNOWN_OBJECT; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") + ACE_TEXT ("not here\n"))); + } + } + } + + ACE_CATCHANY + { + // Normal exception, so the object is not here + status_info.status = TAO_GIOP_UNKNOWN_OBJECT; + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") + ACE_TEXT ("CORBA exception raised\n"))); + } +#if defined (TAO_HAS_EXCEPTIONS) + ACE_CATCHALL + { + // Normal exception, so the object is not here + status_info.status = TAO_GIOP_UNKNOWN_OBJECT; + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ") + ACE_TEXT ("C++ exception raised\n"))); + } +#endif /* TAO_HAS_EXCEPTIONS */ + ACE_ENDTRY; + + return this->make_send_locate_reply (transport, + locate_request, + status_info, + output, + parser); +} + +int +TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport, + TAO_GIOP_Locate_Request_Header &request, + TAO_GIOP_Locate_Status_Msg &status_info, + TAO_OutputCDR &output, + TAO_GIOP_Message_Generator_Parser *parser) +{ + // Note here we are making the Locate reply header which is *QUITE* + // different from the reply header made by the make_reply () call.. + // Make the GIOP message header + this->write_protocol_header (TAO_GIOP_LOCATEREPLY, + output); + + // This writes the header & body + parser->write_locate_reply_mesg (output, + request.request_id (), + status_info); + + output.more_fragments (false); + + // Send the message + int result = transport->send_message (output, + 0, + TAO_Transport::TAO_REPLY); + + // Print out message if there is an error + if (result == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"), + ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply"))); + } + } + + return result; +} + +// Send an "I can't understand you" message -- again, the message is +// prefabricated for simplicity. This implies abortive disconnect (at +// the application level, if not at the level of TCP). +// +// NOTE that IIOP will still benefit from TCP's orderly disconnect. +int +TAO_GIOP_Message_Base::send_error (TAO_Transport *transport) +{ + const char + error_message [TAO_GIOP_MESSAGE_HEADER_LEN] = + { + // The following works on non-ASCII platforms, such as MVS (which + // uses EBCDIC). + 0x47, // 'G' + 0x49, // 'I' + 0x4f, // 'O' + 0x50, // 'P' + (CORBA::Octet) 1, // Use the lowest GIOP version + (CORBA::Octet) 0, + TAO_ENCAP_BYTE_ORDER, + TAO_GIOP_MESSAGERROR, + 0, 0, 0, 0 + }; + + // @@ Q: How does this works with GIOP lite? + // A: It doesn't + + this->dump_msg ("send_error", + (const u_char *) error_message, + TAO_GIOP_MESSAGE_HEADER_LEN); + + ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN, + ACE_Message_Block::MB_DATA, + error_message, + 0, + 0, + ACE_Message_Block::DONT_DELETE, + 0); + ACE_Message_Block message_block(&data_block, + ACE_Message_Block::DONT_DELETE); + message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + size_t bt; + int result = transport->send_message_block_chain (&message_block, bt); + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"), + transport->id ())); + } + + return result; +} + +void +TAO_GIOP_Message_Base::set_state ( + CORBA::Octet def_major, + CORBA::Octet def_minor, + TAO_GIOP_Message_Generator_Parser *&gen_parser) const +{ + switch (def_major) + { + case 1: + switch (def_minor) + { + case 0: + gen_parser = + const_cast<TAO_GIOP_Message_Generator_Parser_10 *> ( + &this->tao_giop_impl_.tao_giop_10); + break; + case 1: + gen_parser = + const_cast<TAO_GIOP_Message_Generator_Parser_11 *> ( + &this->tao_giop_impl_.tao_giop_11); + break; + case 2: + gen_parser = + const_cast<TAO_GIOP_Message_Generator_Parser_12 *> ( + &this->tao_giop_impl_.tao_giop_12); + break; + default: + break; + } + break; + default: + break; + } +} + + +// Server sends an "I'm shutting down now, any requests you've sent me +// can be retried" message to the server. The message is prefab, for +// simplicity. +// +// NOTE: this is IIOP-specific though it doesn't look like it is. It +// relies on a TCP-ism: orderly disconnect, which doesn't exist in all +// transport protocols. Versions of GIOP atop some transport that's +// lacking orderly disconnect must define some transport-specific +// handshaking (e.g. the XNS/SPP handshake convention) in order to +// know that the same transport semantics are provided when shutdown +// is begun with messages "in flight". (IIOP doesn't report false +// errors in the case of "clean shutdown", because it relies on +// orderly disconnect as provided by TCP. This quality of service is +// required to write robust distributed systems.) + +void +TAO_GIOP_Message_Base:: + send_close_connection (const TAO_GIOP_Message_Version &version, + TAO_Transport *transport, + void *) +{ + + // static CORBA::Octet + // I hate this in every method. Till the time I figure out a way + // around I will have them here hanging around. + const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] = + { + // The following works on non-ASCII platforms, such as MVS (which + // uses EBCDIC). + 0x47, // 'G' + 0x49, // 'I' + 0x4f, // 'O' + 0x50, // 'P' + version.major, + version.minor, + TAO_ENCAP_BYTE_ORDER, + TAO_GIOP_CLOSECONNECTION, + 0, 0, 0, 0 + }; + + // It's important that we use a reliable shutdown after we send this + // message, so we know it's received. + // + // @@ should recv and discard queued data for portability; note + // that this won't block (long) since we never set SO_LINGER + + this->dump_msg ("send_close_connection", + (const u_char *) close_message, + TAO_GIOP_MESSAGE_HEADER_LEN); + +#if 0 + // @@CJC I don't think we need this check b/c the transport's send() + // will simply return -1. However, I guess we could create something + // like TAO_Tranport::is_closed() that returns whether the connection + // is already closed. The problem with that, however, is that it's + // entirely possible that is_closed() could return TRUE, and then the + // transport could get closed down btw. the time it gets called and the + // time that the send actually occurs. + ACE_HANDLE which = transport->handle (); + if (which == ACE_INVALID_HANDLE) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -") + ACE_TEXT (" connection already closed\n"))); + return; + } +#endif + + ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN, + ACE_Message_Block::MB_DATA, + close_message, + 0, + 0, + ACE_Message_Block::DONT_DELETE, + 0); + ACE_Message_Block message_block(&data_block); + message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + size_t bt; + int result = transport->send_message_block_chain (&message_block, bt); + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"), + transport->id (), errno)); + } + + transport->close_connection (); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"), + transport-> id ())); + +} + + +int +TAO_GIOP_Message_Base::send_reply_exception ( + TAO_Transport *transport, + TAO_OutputCDR &output, + CORBA::ULong request_id, + IOP::ServiceContextList *svc_info, + CORBA::Exception *x + ) +{ + TAO_Pluggable_Reply_Params_Base reply_params; + reply_params.request_id_ = request_id; + reply_params.svc_ctx_.length (0); + + // We are going to send some data + reply_params.argument_flag_ = 1; + + // Send back the service context we received. (RTCORBA relies on + // this). + reply_params.service_context_notowned (svc_info); + + reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION; + + if (CORBA::SystemException::_downcast (x) != 0) + { + reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION; + } + + if (this->generate_exception_reply (output, + reply_params, + *x) == -1) + return -1; + + output.more_fragments (false); + + return transport->send_message (output, + 0, + TAO_Transport::TAO_REPLY); +} + +void +TAO_GIOP_Message_Base::dump_msg (const char *label, + const u_char *ptr, + size_t len) +{ + + if (TAO_debug_level >= 5) + { + static const char digits[] = "0123456789ABCD"; + static const char *names[] = + { + "Request", + "Reply", + "CancelRequest", + "LocateRequest", + "LocateReply", + "CloseConnection", + "MessageError", + "Fragment" + }; + + // Message name. + const char *message_name = "UNKNOWN MESSAGE"; + u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET]; + if (slot < sizeof (names) / sizeof (names[0])) + message_name = names[slot]; + + // Byte order. + int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01; + + // Get the version info + CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET]; + + // request/reply id. + CORBA::ULong tmp = 0; + CORBA::ULong *id = &tmp; + char *tmp_id = 0; + + if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST || + ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY || + ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT) + { + if (major == 1 && minor < 2) + { + // @@ Only works if ServiceContextList is empty.... + tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4); + } + else + { + tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN); + } +#if !defined (ACE_DISABLE_SWAP_ON_READ) + if (byte_order == TAO_ENCAP_BYTE_ORDER) + { + id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); + } + else + { + ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id)); + } +#else + id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id); +#endif /* ACE_DISABLE_SWAP_ON_READ */ + + } + + // Print. + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_Base::dump_msg, " + "%s GIOP v%c.%c msg, %d data bytes, %s endian, " + "Type %s[%u]\n", + ACE_TEXT_CHAR_TO_TCHAR (label), + digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]], + digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]], + len - TAO_GIOP_MESSAGE_HEADER_LEN , + (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"), + ACE_TEXT_CHAR_TO_TCHAR(message_name), + *id)); + + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + (const char *) ptr, + len, + ACE_TEXT ("GIOP message"))); + } +} + +int +TAO_GIOP_Message_Base::generate_locate_reply_header ( + TAO_OutputCDR & /*cdr*/, + TAO_Pluggable_Reply_Params_Base & /*params*/) +{ + return 0; +} + +int +TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *parser = 0; + + CORBA::Octet major, minor = 0; + + msg.get_version (major, minor); + + // Get the state information that we need to use + this->set_state (major, + minor, + parser); + + // We dont really know.. So ask the generator and parser objects that + // we know. + // @@ TODO: Need to make this faster, instead of making virtual + // call, try todo the check within this class + return parser->is_ready_for_bidirectional (); +} + + +TAO_Queued_Data * +TAO_GIOP_Message_Base::make_queued_data (size_t sz) +{ + // Make a datablock for the size requested + something. The + // "something" is required because we are going to align the data + // block in the message block. During alignment we could loose some + // bytes. As we may not know how many bytes will be lost, we will + // allocate ACE_CDR::MAX_ALIGNMENT extra. + ACE_Data_Block *db = + this->orb_core_->create_input_cdr_data_block (sz + + ACE_CDR::MAX_ALIGNMENT); + + TAO_Queued_Data *qd = + TAO_Queued_Data::make_queued_data ( + this->orb_core_->transport_message_buffer_allocator (), + this->orb_core_->input_cdr_msgblock_allocator (), + db); + + if (qd == 0) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ") + ACE_TEXT ("out of memory, failed to allocate queued data object\n"))); + } + db->release (); + return 0; // NULL pointer + } + + return qd; +} + +size_t +TAO_GIOP_Message_Base::header_length (void) const +{ + return TAO_GIOP_MESSAGE_HEADER_LEN; +} + +size_t +TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major, + CORBA::Octet minor) const +{ + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + // Get the state information that we need to use + this->set_state (major, + minor, + generator_parser); + + return generator_parser->fragment_header_length (); +} + +void +TAO_GIOP_Message_Base::init_queued_data ( + TAO_Queued_Data* qd, + const TAO_GIOP_Message_State& state) const +{ + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + qd->more_fragments_ = state.more_fragments_; + qd->msg_type_ = this->message_type (state); +} + +int +TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd, + CORBA::ULong &request_id) const +{ + // Get a parser for us + TAO_GIOP_Message_Generator_Parser *generator_parser = 0; + + // Get the state information that we need to use + this->set_state (qd->major_version_, + qd->minor_version_, + generator_parser); + + // Get the read and write positions before we steal data. + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; + + // Create a input CDR stream. We do the following + // 1 - If the incoming message block has a data block with a flag + // DONT_DELETE (for the data block) we create an input CDR + // stream the same way. + // 2 - If the incoming message block had a datablock from heap just + // use it by duplicating it and make the flag 0. + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. The same is also done in the higher layers. + + ACE_Message_Block::Message_Flags flg = 0; + ACE_Data_Block *db = 0; + + // Get the flag in the message block + flg = qd->msg_block_->self_flags (); + + if (ACE_BIT_ENABLED (flg, + ACE_Message_Block::DONT_DELETE)) + { + // Use the same datablock + db = qd->msg_block_->data_block (); + } + else + { + // Use a duplicated datablock as the datablock has come off the + // heap. + db = qd->msg_block_->data_block ()->duplicate (); + } + + TAO_InputCDR input_cdr (db, + flg, + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); + + if (qd->major_version_ == 1 && + (qd->minor_version_ == 0 || qd->minor_version_ == 1)) + { + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY) + { + IOP::ServiceContextList service_context; + + if ((input_cdr >> service_context) + && (input_cdr >> request_id)) + { + return 0; + } + } + else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) + { + if ((input_cdr >> request_id)) + { + return 0; + } + } + } + else + { + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST || + qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY) + { + // Dealing with GIOP-1.2, the request-id is located directly + // behind the GIOP-Header. This is true for all message + // types that might be sent in form of fragments or + // cancel-requests. + if ((input_cdr >> request_id)) + { + return 0; + } + } + } + + return -1; +} + +/* @return -1 error, 0 ok, +1 outstanding fragments */ +int +TAO_GIOP_Message_Base::consolidate_fragmented_message ( + TAO_Queued_Data * qd, + TAO_Queued_Data *& msg) +{ + TAO::Incoming_Message_Stack reverse_stack; + + TAO_Queued_Data *tail = 0; + TAO_Queued_Data *head = 0; + + // + // CONSOLIDATE FRAGMENTED MESSAGE + // + + // check for error-condition + if (qd == 0) + { + return -1; + } + + if (qd->major_version_ == 1 && qd->minor_version_ == 0) + { + TAO_Queued_Data::release (qd); + return -1; // error: GIOP-1.0 does not support fragments + } + + // If this is not the last fragment, push it onto stack for later processing + if (qd->more_fragments_) + { + this->fragment_stack_.push (qd); + + msg = 0; // no consolidated message available yet + return 1; // status: more messages expected. + } + + tail = qd; // init + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the header(s) + const size_t header_adjustment = + this->header_length () + + this->fragment_header_length (tail->major_version_, + tail->minor_version_); + + if (tail->msg_block_->length () < header_adjustment) + { + // buffer length not sufficient + TAO_Queued_Data::release (qd); + return -1; + } + + // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2 + if (tail->major_version_ == 1 && tail->minor_version_ == 1) + { + // GIOP-1.1 + + while (this->fragment_stack_.pop (head) != -1) + { + if (head->more_fragments_ && + head->major_version_ == 1 && + head->minor_version_ == 1 && + head->msg_block_->length () >= header_adjustment) + { + // adjust the read-pointer, skip the fragment header + tail->msg_block_->rd_ptr(header_adjustment); + + head->msg_block_->cont (tail->msg_block_); + + tail->msg_block_ = 0; + + TAO_Queued_Data::release (tail); + + tail = head; + } + else + { + reverse_stack.push (head); + } + } + } + else + { + // > GIOP-1.2 + + CORBA::ULong tmp_request_id = 0; + if (this->parse_request_id (tail, tmp_request_id) == -1) + { + return -1; + } + + const CORBA::ULong request_id = tmp_request_id; + + while (this->fragment_stack_.pop (head) != -1) + { + CORBA::ULong head_request_id = 0; + int parse_status = 0; + + if (head->more_fragments_ && + head->major_version_ >= 1 && + head->minor_version_ >= 2 && + head->msg_block_->length () >= header_adjustment && + (parse_status = this->parse_request_id (head, head_request_id)) != -1 && + request_id == head_request_id) + { + // adjust the read-pointer, skip the fragment header + tail->msg_block_->rd_ptr(header_adjustment); + + head->msg_block_->cont (tail->msg_block_); + + tail->msg_block_ = 0; + + TAO_Queued_Data::release (tail); + + tail = head; + } + else + { + if (parse_status == -1) + { + TAO_Queued_Data::release (head); + return -1; + } + + reverse_stack.push (head); + } + } + } + + // restore stack + while (reverse_stack.pop (head) != -1) + { + this->fragment_stack_.push (head); + } + + if (tail->consolidate () == -1) + { + // memory allocation failed + TAO_Queued_Data::release (tail); + return -1; + } + + // set out value + msg = tail; + + return 0; +} + + +int +TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request) +{ + // We must extract the specific request-id from message-buffer + // and remove all fragments from stack that match this request-id. + + TAO::Incoming_Message_Stack reverse_stack; + + CORBA::ULong cancel_request_id; + + if (this->parse_request_id (cancel_request, cancel_request_id) == -1) + { + return -1; + } + + TAO_Queued_Data *head = 0; + + // Revert stack + while (this->fragment_stack_.pop (head) != -1) + { + reverse_stack.push (head); + } + + bool discard_all_GIOP11_messages = false; + + // Now we are able to process message in order they have arrived. + // If the cancel_request_id matches to GIOP-1.1 message, all succeeding + // fragments belong to this message and must be discarded. + // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the + // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments + // having encoded the request id will be discarded. + while (reverse_stack.pop (head) != -1) + { + CORBA::ULong head_request_id; + + if (head->major_version_ == 1 && + head->minor_version_ <= 1 && + head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id + this->parse_request_id (head, head_request_id) >= 0 && + cancel_request_id == head_request_id) + { + TAO_Queued_Data::release (head); + discard_all_GIOP11_messages = true; + } + else if (head->major_version_ == 1 && + head->minor_version_ <= 1 && + discard_all_GIOP11_messages) + { + TAO_Queued_Data::release (head); + } + else if (head->major_version_ >= 1 && + head->minor_version_ >= 2 && + this->parse_request_id (head, head_request_id) >= 0 && + cancel_request_id == head_request_id) + { + TAO_Queued_Data::release (head); + } + else + { + this->fragment_stack_.push (head); + } + } + + return 0; +} + +TAO_GIOP_Fragmentation_Strategy * +TAO_GIOP_Message_Base::fragmentation_strategy (void) +{ + return this->fragmentation_strategy_.get (); +} + +void +TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const +{ + CORBA::Octet * const buf = + reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ())); + + CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + + // Flags for the GIOP protocol header "flags" field. + CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; + + // Least significant bit: Byte order + ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ()); + + // Second least significant bit: More fragments + // + // Only supported in GIOP 1.1 or better. + if (!(major <= 1 && minor == 0)) + ACE_SET_BITS (flags, msg.more_fragments () << 1); +} + +TAO_END_VERSIONED_NAMESPACE_DECL |