// $Id$ #include "GIOP_Message_Base.h" #include "operation_details.h" #include "debug.h" #include "ORB_Core.h" #include "TAO_Server_Request.h" #include "GIOP_Message_Locate_Header.h" #include "Transport.h" #include "Transport_Mux_Strategy.h" #include "LF_Strategy.h" #include "Request_Dispatcher.h" #include "Codeset_Manager.h" #include "SystemException.h" #if !defined (__ACE_INLINE__) # include "GIOP_Message_Base.i" #endif /* __ACE_INLINE__ */ ACE_RCSID (tao, GIOP_Message_Base, "$Id$") namespace { enum { GIOP_HEADER_ALIGNMENT_OFFSET = 4 } ; } // namespace { TAO_InputCDR placeholder_cdr( static_cast(0)) ; } TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) : orb_core_ (orb_core) , message_state_ (orb_core, this) , out_stream_ (this->buffer_, sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ TAO_ENCAP_BYTE_ORDER, orb_core->output_cdr_buffer_allocator (), orb_core->output_cdr_dblock_allocator (), orb_core->output_cdr_msgblock_allocator (), orb_core->orb_params ()->cdr_memcpy_tradeoff (), TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR) { #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) ACE_OS::memset(buffer_, 0, sizeof (buffer_)); #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ } TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void) { } void TAO_GIOP_Message_Base::init (CORBA::Octet major, CORBA::Octet minor) { // Set the giop version of the out stream this->out_stream_.set_version (major, minor); } TAO_OutputCDR & TAO_GIOP_Message_Base::out_stream (void) { return this->out_stream_; } void TAO_GIOP_Message_Base::reset (void) { // no-op } int TAO_GIOP_Message_Base::generate_request_header ( TAO_Operation_Details &op, TAO_Target_Specification &spec, TAO_OutputCDR &cdr ) { // Get a parser for us TAO_GIOP_Message_Generator_Parser *generator_parser = 0; CORBA::Octet major, minor; cdr.get_version (major, minor); // Get the state information that we need to use this->set_state (major, minor, generator_parser); // Write the GIOP header first if (!this->write_protocol_header (TAO_GIOP_REQUEST, cdr)) { if (TAO_debug_level) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); } return -1; } // Now call the implementation for the rest of the header if (!generator_parser->write_request_header (op, spec, cdr)) { if (TAO_debug_level) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Error in writing request header \n"))); return -1; } return 0; } int TAO_GIOP_Message_Base::generate_locate_request_header ( TAO_Operation_Details &op, TAO_Target_Specification &spec, TAO_OutputCDR &cdr ) { // Get a parser for us TAO_GIOP_Message_Generator_Parser *generator_parser = 0; CORBA::Octet major, minor; cdr.get_version (major, minor); // Get the state information that we need to use this->set_state (major, minor, generator_parser); // Write the GIOP header first if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST, cdr)) { if (TAO_debug_level) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); return -1; } // Now call the implementation for the rest of the header if (!generator_parser->write_locate_request_header (op.request_id (), spec, cdr)) { if (TAO_debug_level) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Error in writing locate request header \n"))); return -1; } return 0; } int TAO_GIOP_Message_Base::generate_reply_header ( TAO_OutputCDR &cdr, TAO_Pluggable_Reply_Params_Base ¶ms ) { // Get a parser for us TAO_GIOP_Message_Generator_Parser *generator_parser = 0; CORBA::Octet major, minor; cdr.get_version (major, minor); // Get the state information that we need to use this->set_state (major, minor, generator_parser); // Write the GIOP header first if (!this->write_protocol_header (TAO_GIOP_REPLY, cdr)) { if (TAO_debug_level) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Error in writing GIOP header \n"))); return -1; } ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { // Now call the implementation for the rest of the header int result = generator_parser->write_reply_header (cdr, params ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; if (!result) { if (TAO_debug_level > 4) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Error in writing reply ") ACE_TEXT ("header\n"))); return -1; } } ACE_CATCHANY { if (TAO_debug_level > 4) ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_GIOP_Message_Base::generate_reply_header"); return -1; } ACE_ENDTRY; return 0; } int TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/, int /*block */, ACE_Time_Value * /*max_wait_time*/) { return 0; } int TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) { // Ptr to first buffer. char *buf = (char *) stream.buffer (); // Length of all buffers. size_t total_len = stream.total_length (); // NOTE: Here would also be a fine place to calculate a digital // signature for the message and place it into a preallocated slot // in the "ServiceContext". Similarly, this is a good spot to // encrypt messages (or just the message bodies) if that's needed in // this particular environment and that isn't handled by the // networking infrastructure (e.g., IPSEC). CORBA::ULong bodylen = static_cast (total_len - TAO_GIOP_MESSAGE_HEADER_LEN); #if !defined (ACE_ENABLE_SWAP_ON_WRITE) *reinterpret_cast (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET) = bodylen; #else if (!stream.do_byte_swap ()) *reinterpret_cast (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET) = bodylen; else ACE_CDR::swap_4 (reinterpret_cast(&bodylen), buf + TAO_GIOP_MESSAGE_SIZE_OFFSET); #endif /* ACE_ENABLE_SWAP_ON_WRITE */ if (TAO_debug_level > 2) { // Check whether the output cdr stream is build up of multiple // messageblocks. If so, consolidate them to one block that can be // dumped ACE_Message_Block* consolidated_block = 0; if (stream.begin()->cont () != 0) { consolidated_block = new ACE_Message_Block; ACE_CDR::consolidate (consolidated_block, stream.begin ()); buf = (char *) (consolidated_block->rd_ptr ()); } /// this->dump_msg ("send", reinterpret_cast(buf), total_len); // delete consolidated_block; consolidated_block = 0; // } return 0; } TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type ( TAO_GIOP_Message_State &msg_state) { // Convert to the right type of Pluggable Messaging message type. switch (msg_state.message_type_) { case TAO_GIOP_REQUEST: return TAO_PLUGGABLE_MESSAGE_REQUEST; case TAO_GIOP_LOCATEREQUEST: return TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST; case TAO_GIOP_LOCATEREPLY: return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY; case TAO_GIOP_REPLY: return TAO_PLUGGABLE_MESSAGE_REPLY; case TAO_GIOP_CLOSECONNECTION: return TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION; case TAO_GIOP_FRAGMENT: return TAO_PLUGGABLE_MESSAGE_FRAGMENT; case TAO_GIOP_MESSAGERROR: case TAO_GIOP_CANCELREQUEST: // Does it happen? why?? default: ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ") ACE_TEXT ("wrong message.\n"))); } return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) { if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - Transport[%d], ") ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: ") ACE_TEXT("entered with chained data: %s, refcount: %d, total length: %d, ") ACE_TEXT("start: %x should be aligned to %d bytes.\n"), transport->id (), ((qd->msg_block_->cont()==0)? ACE_TEXT("no"): ACE_TEXT("yes")), qd->msg_block_->data_block()->reference_count(), qd->msg_block_->total_length(), reinterpret_cast(qd->msg_block_->rd_ptr()), // ACE_CDR::LONG_ALIGN (ACE_CDR::MAX_ALIGNMENT + GIOP_HEADER_ALIGNMENT_OFFSET) )); } // Set the upcall thread this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ()); // Get a parser for us TAO_GIOP_Message_Generator_Parser *generator_parser = 0; // Get the state information that we need to use this->set_state (qd->major_version_, qd->minor_version_, generator_parser); // A buffer that we will use to initialise the CDR stream #if defined (ACE_HAS_PURIFY) char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 }; #else char repbuf[ACE_CDR::DEFAULT_BUFSIZE]; #endif /* ACE_HAS_PURIFY */ // Initialze an output CDR on the stack // NOTE: Don't jump to a conclusion as to why we are using the // inpout_cdr and hence the global pool here. These pools will move // to the lanes anyway at some point of time. Further, it would have // been awesome to have this in TSS. But for some reason the cloning // that happens when the ORB gets flow controlled while writing a // reply is messing things up. We crash horribly. Doing this adds a // lock, we need to set things like this -- put stuff in TSS here // and transfer to global memory when we get flow controlled. We // need to work on the message block to get it right! TAO_OutputCDR output (repbuf, sizeof repbuf, TAO_ENCAP_BYTE_ORDER, this->orb_core_->input_cdr_buffer_allocator (), this->orb_core_->input_cdr_dblock_allocator (), this->orb_core_->input_cdr_msgblock_allocator (), this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), qd->major_version_, qd->minor_version_); // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); if (TAO_debug_level >= 10) qd->dump_msg ("request message") ; // // I tried making the placeholder static, but there were issues when // the static destructors ran, so I waste time here. I really need // just a static nil CDR to use while I determine which CDR to // actually create for demarshaling. // TAO_InputCDR placeholder_cdr( static_cast(0)) ; TAO_InputCDR& input_cdr = placeholder_cdr ; if( qd->msg_block_->cont() == 0 && ((reinterpret_cast(qd->msg_block_->rd_ptr()) % ACE_CDR::MAX_ALIGNMENT ) == GIOP_HEADER_ALIGNMENT_OFFSET)) { // // Message is aligned and in a single block, no copy required. // TAO_InputCDR not_copied_cdr(qd->msg_block_->data_block ()->duplicate(), qd->msg_block_->self_flags (), rd_pos, wr_pos, qd->byte_order_, qd->major_version_, qd->minor_version_, this->orb_core_); input_cdr = not_copied_cdr ; } else { // // Prepend a stack buffer with enough data to ensure correct // alignment of the data. // #if defined (ACE_HAS_PURIFY) char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] = { 0 } ; #else char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] ; #endif /* ACE_HAS_PURIFY */ ACE_Message_Block header_proxy( buffer, GIOP_HEADER_ALIGNMENT_OFFSET) ; ACE_CDR::mb_align( &header_proxy) ; header_proxy.wr_ptr( GIOP_HEADER_ALIGNMENT_OFFSET) ; header_proxy.cont( qd->msg_block_) ; // // Let the constructor to the copying. // TAO_InputCDR copied_cdr( &header_proxy, qd->byte_order_, qd->major_version_, qd->minor_version_, this->orb_core_) ; input_cdr = copied_cdr ; // // Skip the empty header alignment bytes. // input_cdr.skip_bytes( GIOP_HEADER_ALIGNMENT_OFFSET) ; } if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - Transport[%d], ") ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: ") ACE_TEXT("cdr starts at: %x\n"), transport->id (), input_cdr.start()->rd_ptr() )); } if (TAO_debug_level >= 10) ACE_HEX_DUMP ((LM_DEBUG, (const char *) input_cdr.start()->rd_ptr(), input_cdr.start()->length(), ACE_TEXT ("Request message"))); transport->assign_translators(&input_cdr,&output); // We know we have some request message. Check whether it is a // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. // Once we send the InputCDR stream we need to just forget about // the stream and never touch that again for anything. We basically // loose ownership of the data_block. switch (qd->msg_type_) { case TAO_PLUGGABLE_MESSAGE_REQUEST: // Should be taken care by the state specific invocations. They // could raise an exception or write things in the output CDR // stream return this->process_request (transport, input_cdr, output, generator_parser); break ; case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST: return this->process_locate_request (transport, input_cdr, output, generator_parser); break ; default: return -1; break ; } } int TAO_GIOP_Message_Base::process_reply_message ( TAO_Pluggable_Reply_Params ¶ms, TAO_Queued_Data *qd) { if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - ") ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") ACE_TEXT("entered with chained data: %s, refcount: %d, total length: %d, ") ACE_TEXT("base: %x should be aligned to %d bytes.\n"), params.transport_->id (), ((qd->msg_block_->cont()==0)? ACE_TEXT("no"): ACE_TEXT("yes")), qd->msg_block_->data_block()->reference_count(), qd->msg_block_->total_length(), reinterpret_cast(qd->msg_block_->rd_ptr()), // ACE_CDR::LONG_ALIGN ACE_CDR::MAX_ALIGNMENT )); } // Get a parser for us TAO_GIOP_Message_Generator_Parser *generator_parser = 0; // Get the state information that we need to use this->set_state (qd->major_version_, qd->minor_version_, generator_parser); // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); // We are now eliding the header before calling this method. //rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; if (TAO_debug_level >= 10) qd->dump_msg ("reply message"); // // I tried making the placeholder static, but there were issues when // the static destructors ran, so I waste time here. I really need // just a static nil CDR to use while I determine which CDR to // actually create for demarshaling. // TAO_InputCDR placeholder_cdr( static_cast(0)) ; TAO_InputCDR& input_cdr = placeholder_cdr ; if( qd->msg_block_->cont() == 0 && ((reinterpret_cast(qd->msg_block_->rd_ptr()) % ACE_CDR::MAX_ALIGNMENT ) == GIOP_HEADER_ALIGNMENT_OFFSET)) { // // Message is aligned and in a single block, no copy required. // TAO_InputCDR not_copied_cdr(qd->msg_block_->data_block ()->duplicate(), qd->msg_block_->self_flags (), rd_pos, wr_pos, qd->byte_order_, qd->major_version_, qd->minor_version_, this->orb_core_); input_cdr = not_copied_cdr ; } else { // // Prepend a stack buffer with enough data to ensure correct // alignment of the data. // #if defined (ACE_HAS_PURIFY) char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] = { 0 } ; #else char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] ; #endif /* ACE_HAS_PURIFY */ ACE_Message_Block header_proxy( buffer, GIOP_HEADER_ALIGNMENT_OFFSET) ; ACE_CDR::mb_align( &header_proxy) ; header_proxy.wr_ptr( GIOP_HEADER_ALIGNMENT_OFFSET) ; header_proxy.cont( qd->msg_block_) ; // // Let the constructor to the copying. // TAO_InputCDR copied_cdr( &header_proxy, qd->byte_order_, qd->major_version_, qd->minor_version_, this->orb_core_) ; input_cdr = copied_cdr ; // // Skip the empty header alignment bytes. // input_cdr.skip_bytes( GIOP_HEADER_ALIGNMENT_OFFSET) ; } if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - ") ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") ACE_TEXT("cdr starts at: %x\n"), params.transport_->id (), input_cdr.start()->rd_ptr() )); } if (TAO_debug_level >= 10) ACE_HEX_DUMP ((LM_DEBUG, (const char *) input_cdr.start()->rd_ptr(), input_cdr.start()->length(), ACE_TEXT ("Reply message"))); // We know we have some reply message. Check whether it is a // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. // Once we send the InputCDR stream we need to just forget about // the stream and never touch that again for anything. We basically // loose ownership of the data_block. int retval = 0; switch (qd->msg_type_) { case TAO_PLUGGABLE_MESSAGE_REPLY: // Should be taken care by the state specific parsing retval = generator_parser->parse_reply (input_cdr, params); break; case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: retval = generator_parser->parse_locate_reply (input_cdr, params); break; default: retval = -1; } if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - ") ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") ACE_TEXT("parsed reply, return value: %d\n"), params.transport_->id (), retval )); } if (retval == -1) return retval; params.input_cdr_ = &input_cdr; retval = params.transport_->tms ()->dispatch_reply (params); if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - ") ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") ACE_TEXT("dispatched reply, return value: %d\n"), params.transport_->id (), retval )); } if (retval == -1) { // Something really critical happened, we will forget about // every reply on this connection. if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, "TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, " "dispatch reply failed\n", params.transport_->id ())); } return retval; } int TAO_GIOP_Message_Base::generate_exception_reply ( TAO_OutputCDR &cdr, TAO_Pluggable_Reply_Params_Base ¶ms, CORBA::Exception &x ) { // A new try/catch block, but if something goes wrong now we have no // hope, just abort. ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { // Make the GIOP & reply header. this->generate_reply_header (cdr, params); x._tao_encode (cdr ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; } ACE_CATCH (CORBA::Exception, ex) { // Now we know that while handling the error an other error // happened -> no hope, close connection. // Close the handle. ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ") ACE_TEXT ("generate_exception_reply ()"))); return -1; } ACE_ENDTRY; ACE_CHECK_RETURN (-1); return 0; } int TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type t, TAO_OutputCDR &msg) { // Reset the message type // Reset the message type msg.reset (); CORBA::Octet header[12] = { // The following works on non-ASCII platforms, such as MVS (which // uses EBCDIC). 0x47, // 'G' 0x49, // 'I' 0x4f, // 'O' 0x50 // 'P' }; CORBA::Octet major, minor = 0; msg.get_version (major, minor); header[4] = major; header[5] = minor; // We are putting the byte order. But at a later date if we support // fragmentation and when we want to use the other 6 bits in this // octet we can have a virtual function do this for us as the // version info , Bala header[6] = (TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ()); header[7] = CORBA::Octet(t); static int header_size = sizeof (header) / sizeof (header[0]); msg.write_octet_array (header, header_size); return msg.good_bit (); } int TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, TAO_InputCDR &cdr, TAO_OutputCDR &output, TAO_GIOP_Message_Generator_Parser *parser) { // This will extract the request header, set // and as appropriate. TAO_ServerRequest request (this, cdr, output, transport, this->orb_core_); CORBA::ULong request_id = 0; CORBA::Boolean response_required = 0; int parse_error = 0; ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { parse_error = parser->parse_request_header (request); if (parse_error != 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request, ") ACE_TEXT("ERROR parsing request header.\n") )); } TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); if (csm) csm->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the if (parse_error != 0) ACE_TRY_THROW (CORBA::MARSHAL (0, CORBA::COMPLETED_NO)); request_id = request.request_id (); response_required = request.response_expected (); CORBA::Object_var forward_to; if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request, ") ACE_TEXT("dispatching request.\n") )); } // Do this before the reply is sent. this->orb_core_->request_dispatcher ()->dispatch ( this->orb_core_, request, forward_to ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request, ") ACE_TEXT("returned from dispatching request.\n") )); } if (!CORBA::is_nil (forward_to.in ())) { // We should forward to another object... TAO_Pluggable_Reply_Params_Base reply_params; reply_params.request_id_ = request_id; reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD; reply_params.svc_ctx_.length (0); // Send back the reply service context. reply_params.service_context_notowned (&request.reply_service_info ()); // Make the GIOP header and Reply header this->generate_reply_header (output, reply_params); if (!(output << forward_to.in ())) { if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ") ACE_TEXT ("forward reference.\n"))); return -1; } int result = transport->send_message (output, 0, TAO_Transport::TAO_REPLY); if (result == -1) { if (TAO_debug_level > 0) { // No exception but some kind of error, yet a // response is required. ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") ACE_TEXT ("cannot send reply\n"), ACE_TEXT ("TAO_GIOP_Message_Base::process_request"))); } } return result; } } // Only CORBA exceptions are caught here. ACE_CATCHANY { int result = 0; if (response_required) { result = this->send_reply_exception (transport, output, request_id, &request.reply_service_info (), &ACE_ANY_EXCEPTION); if (result == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") ACE_TEXT ("cannot send exception\n"), ACE_TEXT ("process_connector_request ()"))); ACE_PRINT_EXCEPTION ( ACE_ANY_EXCEPTION, "TAO_GIOP_Message_Base::process_request[1]"); } } } else if (TAO_debug_level > 0) { // It is unfortunate that an exception (probably a system // exception) was thrown by the upcall code (even by the // user) when the client was not expecting a response. // However, in this case, we cannot close the connection // down, since it really isn't the client's fault. ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) exception thrown ") ACE_TEXT ("but client is not waiting a response\n"))); ACE_PRINT_EXCEPTION ( ACE_ANY_EXCEPTION, "TAO_GIOP_Message_Base::process_request[2]"); } return result; } #if defined (TAO_HAS_EXCEPTIONS) ACE_CATCHALL { // @@ TODO some c++ exception or another, but what do we do with // it? // We are supposed to map it into a CORBA::UNKNOWN exception. // BTW, this cannot be detected if using the mapping. If // we have native exceptions but no support for them in the ORB // we should still be able to catch it. If we don't have native // exceptions it couldn't have been raised in the first place! int result = 0; if (response_required) { CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0), CORBA::COMPLETED_MAYBE); result = this->send_reply_exception (transport, output, request_id, &request.reply_service_info (), &exception); if (result == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ") ACE_TEXT ("%p: ") ACE_TEXT ("cannot send exception\n"), ACE_TEXT ("process_request ()"))); ACE_PRINT_EXCEPTION ( exception, "TAO_GIOP_Message_Base::process_request[3]"); } } } else if (TAO_debug_level > 0) { // It is unfotunate that an exception (probably a system // exception) was thrown by the upcall code (even by the // user) when the client was not expecting a response. // However, in this case, we cannot close the connection // down, since it really isn't the client's fault. ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t|%N|%l) exception thrown ") ACE_TEXT ("but client is not waiting a response\n"))); } return result; } #endif /* TAO_HAS_EXCEPTIONS */ ACE_ENDTRY; return 0; } int TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, TAO_InputCDR &input, TAO_OutputCDR &output, TAO_GIOP_Message_Generator_Parser *parser) { // This will extract the request header, set as // appropriate. TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_); TAO_GIOP_Locate_Status_Msg status_info; // Defaulting. status_info.status = TAO_GIOP_UNKNOWN_OBJECT; CORBA::Boolean response_required = 1; ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { int parse_error = parser->parse_locate_header (locate_request); if (parse_error != 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") ACE_TEXT("ERROR parsing locate request header.\n") )); } if (parse_error != 0) { ACE_TRY_THROW (CORBA::MARSHAL (0, CORBA::COMPLETED_NO)); } TAO::ObjectKey tmp_key (locate_request.object_key ().length (), locate_request.object_key ().length (), locate_request.object_key ().get_buffer (), 0); // Set it to an error state parse_error = 1; CORBA::ULong req_id = locate_request.request_id (); // We will send the reply. The ServerRequest class need not send // the reply CORBA::Boolean deferred_reply = 1; TAO_ServerRequest server_request (this, req_id, response_required, deferred_reply, tmp_key, "_non_existent", output, transport, this->orb_core_, parse_error); if (parse_error != 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") ACE_TEXT("ERROR creating server request.\n") )); } if (parse_error != 0) { ACE_TRY_THROW (CORBA::MARSHAL (0, CORBA::COMPLETED_NO)); } CORBA::Object_var forward_to; this->orb_core_->request_dispatcher ()->dispatch ( this->orb_core_, server_request, forward_to ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; if (!CORBA::is_nil (forward_to.in ())) { status_info.status = TAO_GIOP_OBJECT_FORWARD; status_info.forward_location_var = forward_to; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") ACE_TEXT ("called: forwarding\n"))); } else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION) { // We got no exception, so the object is here. status_info.status = TAO_GIOP_OBJECT_HERE; if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") ACE_TEXT ("found\n"))); } else { status_info.forward_location_var = server_request.forward_location (); if (!CORBA::is_nil (status_info.forward_location_var.in ())) { status_info.status = TAO_GIOP_OBJECT_FORWARD; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") ACE_TEXT ("forwarding\n"))); } else { // Normal exception, so the object is not here status_info.status = TAO_GIOP_UNKNOWN_OBJECT; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") ACE_TEXT ("not here\n"))); } } } ACE_CATCHANY { // Normal exception, so the object is not here status_info.status = TAO_GIOP_UNKNOWN_OBJECT; if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ") ACE_TEXT ("CORBA exception raised\n"))); } #if defined (TAO_HAS_EXCEPTIONS) ACE_CATCHALL { // Normal exception, so the object is not here status_info.status = TAO_GIOP_UNKNOWN_OBJECT; if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ") ACE_TEXT ("C++ exception raised\n"))); } #endif /* TAO_HAS_EXCEPTIONS */ ACE_ENDTRY; return this->make_send_locate_reply (transport, locate_request, status_info, output, parser); } int TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport, TAO_GIOP_Locate_Request_Header &request, TAO_GIOP_Locate_Status_Msg &status_info, TAO_OutputCDR &output, TAO_GIOP_Message_Generator_Parser *parser) { // Note here we are making the Locate reply header which is *QUITE* // different from the reply header made by the make_reply () call.. // Make the GIOP message header this->write_protocol_header (TAO_GIOP_LOCATEREPLY, output); // This writes the header & body parser->write_locate_reply_mesg (output, request.request_id (), status_info); // Send the message int result = transport->send_message (output, 0, TAO_Transport::TAO_REPLY); // Print out message if there is an error if (result == -1) { if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"), ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply"))); } } return result; } // Send an "I can't understand you" message -- again, the message is // prefabricated for simplicity. This implies abortive disconnect (at // the application level, if not at the level of TCP). // // NOTE that IIOP will still benefit from TCP's orderly disconnect. int TAO_GIOP_Message_Base::send_error (TAO_Transport *transport) { const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] = { // The following works on non-ASCII platforms, such as MVS (which // uses EBCDIC). 0x47, // 'G' 0x49, // 'I' 0x4f, // 'O' 0x50, // 'P' (CORBA::Octet) 1, // Use the lowest GIOP version (CORBA::Octet) 0, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_MESSAGERROR, 0, 0, 0, 0 }; // @@ Q: How does this works with GIOP lite? // A: It doesn't this->dump_msg ("send_error", (const u_char *) error_message, TAO_GIOP_MESSAGE_HEADER_LEN); ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN, ACE_Message_Block::MB_DATA, error_message, 0, 0, ACE_Message_Block::DONT_DELETE, 0); ACE_Message_Block message_block(&data_block, ACE_Message_Block::DONT_DELETE); message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); size_t bt; int result = transport->send_message_block_chain (&message_block, bt); if (result == -1) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"), transport->id ())); } return result; } void TAO_GIOP_Message_Base::set_state ( CORBA::Octet def_major, CORBA::Octet def_minor, TAO_GIOP_Message_Generator_Parser *&gen_parser) { switch (def_major) { case 1: switch (def_minor) { case 0: gen_parser = &this->tao_giop_impl_.tao_giop_10; break; case 1: gen_parser = &this->tao_giop_impl_.tao_giop_11; break; case 2: gen_parser = &this->tao_giop_impl_.tao_giop_12; break; default: break; } break; default: break; } } // Server sends an "I'm shutting down now, any requests you've sent me // can be retried" message to the server. The message is prefab, for // simplicity. // // NOTE: this is IIOP-specific though it doesn't look like it is. It // relies on a TCP-ism: orderly disconnect, which doesn't exist in all // transport protocols. Versions of GIOP atop some transport that's // lacking orderly disconnect must define some transport-specific // handshaking (e.g. the XNS/SPP handshake convention) in order to // know that the same transport semantics are provided when shutdown // is begun with messages "in flight". (IIOP doesn't report false // errors in the case of "clean shutdown", because it relies on // orderly disconnect as provided by TCP. This quality of service is // required to write robust distributed systems.) #if 0 void TAO_GIOP_Message_Base:: send_close_connection (const TAO_GIOP_Message_Version &version, TAO_Transport *transport, void *) { // static CORBA::Octet // I hate this in every method. Till the time I figure out a way // around I will have them here hanging around. const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] = { // The following works on non-ASCII platforms, such as MVS (which // uses EBCDIC). 0x47, // 'G' 0x49, // 'I' 0x4f, // 'O' 0x50, // 'P' version.major, version.minor, TAO_ENCAP_BYTE_ORDER, TAO_GIOP_CLOSECONNECTION, 0, 0, 0, 0 }; // It's important that we use a reliable shutdown after we send this // message, so we know it's received. // // @@ should recv and discard queued data for portability; note // that this won't block (long) since we never set SO_LINGER this->dump_msg ("send_close_connection", (const u_char *) close_message, TAO_GIOP_MESSAGE_HEADER_LEN); #if 0 // @@CJC I don't think we need this check b/c the transport's send() // will simply return -1. However, I guess we could create something // like TAO_Tranport::is_closed() that returns whether the connection // is already closed. The problem with that, however, is that it's // entirely possible that is_closed() could return TRUE, and then the // transport could get closed down btw. the time it gets called and the // time that the send actually occurs. ACE_HANDLE which = transport->handle (); if (which == ACE_INVALID_HANDLE) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -" " connection already closed\n")); return; } #endif ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN, ACE_Message_Block::MB_DATA, close_message, 0, 0, ACE_Message_Block::DONT_DELETE, 0); ACE_Message_Block message_block(&data_block); message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); size_t bt; int result = transport->send_message_block_chain (&message_block, bt); if (result == -1) { if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, "(%P|%t) error closing connection %u, errno = %d\n", transport->id (), errno)); } transport->close_connection (); ACE_DEBUG ((LM_DEBUG, "(%P|%t) shut down transport, handle %d\n", transport-> id ())); } #endif int TAO_GIOP_Message_Base::send_reply_exception ( TAO_Transport *transport, TAO_OutputCDR &output, CORBA::ULong request_id, IOP::ServiceContextList *svc_info, CORBA::Exception *x ) { TAO_Pluggable_Reply_Params_Base reply_params; reply_params.request_id_ = request_id; reply_params.svc_ctx_.length (0); // We are going to send some data reply_params.argument_flag_ = 1; // Send back the service context we received. (RTCORBA relies on // this). reply_params.service_context_notowned (svc_info); reply_params.reply_status_ = TAO_GIOP_USER_EXCEPTION; if (CORBA::SystemException::_downcast (x) != 0) { reply_params.reply_status_ = TAO_GIOP_SYSTEM_EXCEPTION; } if (this->generate_exception_reply (output, reply_params, *x) == -1) return -1; return transport->send_message (output, 0, TAO_Transport::TAO_REPLY); } void TAO_GIOP_Message_Base::dump_msg (const char *label, const u_char *ptr, size_t len) { if (TAO_debug_level >= 5) { static const char digits[] = "0123456789ABCD"; static const char *names[] = { "Request", "Reply", "CancelRequest", "LocateRequest", "LocateReply", "CloseConnection", "MessageError", "Fragment" }; // Message name. const char *message_name = "UNKNOWN MESSAGE"; u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET]; if (slot < sizeof (names) / sizeof (names[0])) message_name = names[slot]; // Byte order. int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01; // Get the version info // CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]; CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET]; // request/reply id. CORBA::ULong tmp = 0; CORBA::ULong *id = &tmp; char *tmp_id = 0; if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST || ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY) { if (minor < 2) { // @@ Only works if ServiceContextList is empty.... tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4); } else { tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN); } #if !defined (ACE_DISABLE_SWAP_ON_READ) if (byte_order == TAO_ENCAP_BYTE_ORDER) { id = reinterpret_cast (tmp_id); } else { ACE_CDR::swap_4 (tmp_id, reinterpret_cast (id)); } #else id = reinterpret_cast (tmp_id); #endif /* ACE_DISABLE_SWAP_ON_READ */ } // Print. ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - GIOP_Message_Base::dump_msg, " "%s GIOP v%c.%c msg, %d data bytes, %s endian, " "Type %s[%u]\n", ACE_TEXT_CHAR_TO_TCHAR (label), digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]], digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]], len - TAO_GIOP_MESSAGE_HEADER_LEN , (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"), ACE_TEXT_CHAR_TO_TCHAR(message_name), *id)); if (TAO_debug_level >= 10) ACE_HEX_DUMP ((LM_DEBUG, (const char *) ptr, len, ACE_TEXT ("GIOP message"))); } } int TAO_GIOP_Message_Base::generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, TAO_Pluggable_Reply_Params_Base & /*params*/) { return 0; } int TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) { // Get a parser for us TAO_GIOP_Message_Generator_Parser *parser = 0; CORBA::Octet major, minor = 0; msg.get_version (major, minor); // Get the state information that we need to use this->set_state (major, minor, parser); // We dont really know.. So ask the generator and parser objects that // we know. // @@ TODO: Need to make this faster, instead of making virtual // call, try todo the check within this class return parser->is_ready_for_bidirectional (); } void TAO_GIOP_Message_Base::set_queued_data_from_message_header ( TAO_Queued_Data *qd, const ACE_Message_Block &mb ) const { // @@CJC: Try leaving out the declaration for this->message_state_ // and see what pukes. I don't think we need it any more. TAO_GIOP_Message_State state; if (state.take_values_from_message_block (mb) == -1) { // what the heck do we do here?! qd->current_state_ = TAO_Queued_Data::INVALID; return; } // It'd be nice to have an abstract base for GIOP_Message_State // so that there could just be a line like: // qd->take_values_from (state); // Get the message information qd->byte_order_ = state.byte_order (); qd->major_version_ = state.giop_version ().major; qd->minor_version_ = state.giop_version ().minor; qd->more_fragments_ = state.more_fragments () ? 1 : 0; qd->request_id_ = state.request_id_; qd->msg_type_= message_type (state); qd->missing_data_bytes_ = state.payload_size (); } void TAO_GIOP_Message_Base::set_request_id_from_peek ( TAO_Queued_Data* qd, const ACE_Message_Block* mb ) const { // // No message data means no request header means no request ID. // if( mb == 0) { return ; } // // Each protocol version has the request ID located differently. // switch( qd->major_version_<<8 | qd->minor_version_) { case 0x0100: case 0x0101: // GIOP 1.0 and 1.1 have same location for request ID. // IOP { // typedef unsigned long ServiceId; // struct ServiceContext { // ServiceId context_id; // sequence context_data; // }; // typedef sequence ServiceContextList; // }; // // struct RequestHeader_1_0 { // and struct RequestHeader_1_1 { // IOP::ServiceContextList service_context; // unsigned long request_id; /// @todo: Need to skip the sequence thingies here. /// @todo: Without the skip, this is what the code does today. qd->request_id_ = TAO_GIOP_Message_State::read4( mb->rd_ptr(), qd->byte_order_) ; break ; case 0x0102: // GIOP 1.2 has request ID first. // struct RequestHeader_1_2 { // unsigned long request_id ; qd->request_id_ = TAO_GIOP_Message_State::read4( mb->rd_ptr(), qd->byte_order_) ; break ; } } int TAO_GIOP_Message_Base::check_for_valid_header ( const ACE_Message_Block &mb ) const { // NOTE! We don't hardcode the length of the header b/c header_length should // be eligible for inlining by pretty much any compiler, and it should return // a constant. The rest of this method is hard-coded and hand-optimized because // this method gets called A LOT. if (mb.length () < this->header_length ()) return -1; // Is finding that it's the right length and the magic bytes present // enough to declare it a valid header? I think so... register const char* h = mb.rd_ptr (); return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P'); }