diff options
author | bala <balanatarajan@users.noreply.github.com> | 2000-12-06 21:40:25 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2000-12-06 21:40:25 +0000 |
commit | 3640e32fd894c0bfacd4e56afdb7a719924adc59 (patch) | |
tree | 177a43800d8c80ec8d0f45de05474992f86df114 | |
parent | 216baeffdfcf0773c451ae3617a1cd3d01e03ada (diff) | |
download | ATCD-3640e32fd894c0bfacd4e56afdb7a719924adc59.tar.gz |
*** empty log message ***
35 files changed, 2707 insertions, 2754 deletions
diff --git a/TAO/tao/Acceptor_Impl.cpp b/TAO/tao/Acceptor_Impl.cpp index 6c830b19fc2..f175a411495 100644 --- a/TAO/tao/Acceptor_Impl.cpp +++ b/TAO/tao/Acceptor_Impl.cpp @@ -78,6 +78,13 @@ TAO_Concurrency_Strategy<SVC_HANDLER>::activate_svc_handler (SVC_HANDLER *sh, arg) == -1) return -1; + // The service handler has been activated. Now cache the handler. + if (sh->add_handler_to_cache () == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not add the handler to Cache \n"))); + } + TAO_Server_Strategy_Factory *f = this->orb_core_->server_factory (); diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index b5cae0aca32..329d82f786d 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -110,8 +110,8 @@ TAO_GIOP_Message_Base::generate_request_header ( // Now call the implementation for the rest of the header if (!this->generator_parser_->write_request_header (op, - spec, - cdr)) + spec, + cdr)) { if (TAO_debug_level > 4) ACE_ERROR_RETURN ((LM_ERROR, @@ -294,7 +294,6 @@ TAO_GIOP_Message_Base::read_message (TAO_Transport *transport, } } - cout << "Amba " <<endl; return this->message_state_.is_complete (); } @@ -394,7 +393,7 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, orb_core->leader_follower ().set_upcall_thread (); // Reset the output CDR stream. - // @@@@Should we be doing this here? + // @@@@Is it necessary here? this->output_->reset (); // @@ -415,7 +414,8 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // @@@ Needed for DOORS // orb_core->services_log_msg_rcv (this->message_state_); - // Reset the message state. + // Reset the message state. Now, we are ready for the next nested + // upcall if any. this->message_state_.reset (0); // We know we have some request message. Check whether it is a @@ -1329,3 +1329,12 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, } } + + +int +TAO_GIOP_Message_Base::generate_locate_reply_header ( + TAO_OutputCDR & /*cdr*/, + TAO_Pluggable_Reply_Params & /*params*/) +{ + return 0; +} diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index ebfc547411e..6e76ebc810c 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -182,7 +182,7 @@ private: /// Write the locate reply header virtual int generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, - TAO_Pluggable_Reply_Params & /*params*/) { return 0;}; + TAO_Pluggable_Reply_Params & /*params*/); private: diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index 3e919276636..52f25f125ef 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -9,14 +9,20 @@ #include "tao/TAO_Server_Request.h" #include "tao/GIOP_Message_Headers.h" #include "tao/target_specification.h" +#include "tao/Leader_Follower.h" #if !defined (__ACE_INLINE__) # include "tao/GIOP_Message_Lite.i" #endif /* __ACE_INLINE__ */ +static const size_t TAO_GIOP_LITE_HEADER_LEN = 5; +static const size_t TAO_GIOP_LITE_MESSAGE_SIZE_OFFSET = 0; +static const size_t TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET = 4; TAO_GIOP_Message_Lite::TAO_GIOP_Message_Lite (TAO_ORB_Core *orb_core) - : cdr_buffer_alloc_ ( + :message_state_ (orb_core), + output_ (0), + cdr_buffer_alloc_ ( orb_core->resource_factory ()->output_cdr_buffer_allocator () ), cdr_dblock_alloc_ ( @@ -40,193 +46,252 @@ TAO_GIOP_Message_Lite::TAO_GIOP_Message_Lite (TAO_ORB_Core *orb_core) } -CORBA::Boolean -TAO_GIOP_Message_Lite::write_protocol_header (TAO_Pluggable_Message_Type t, - TAO_OutputCDR &msg) +TAO_GIOP_Message_Lite::~TAO_GIOP_Message_Lite (void) { - // Reset the CDR stream, we are going to generate a completely new - // message. - msg.reset (); + // Explicitly call the destructor of the output CDR first. They need + // the allocators during destruction. + delete this->output_; + + // Then call the destructor of our allocators + if (this->cdr_dblock_alloc_ != 0) + this->cdr_dblock_alloc_->remove (); + // delete this->cdr_dblock_alloc_; + + if (this->cdr_buffer_alloc_ != 0) + this->cdr_buffer_alloc_->remove (); + // delete this->cdr_buffer_alloc_; +} - // @@ Bala: this is something to think harder about: right now we - // leave the space to store the length, and later we set it, but the - // way we do it is CDR specific... Maybe the XXXStream classes - // should support a 'save your current position' method that returns - // a 'Mememto' (check the GoF book), later the CDR stream could be - // restored to that state, and the size written to it. - // @@ Then again, i don't know how would that work with fragments - // (eventually we may want TAO to generate fragments), or protocols - // based on chunking.... - // - // Write a dummy <size> later it is set to the right value... @@ - CORBA::ULong size = 0; - msg.write_ulong (size); - TAO_GIOP_Message_Type type = TAO_GIOP_MESSAGERROR; +void +TAO_GIOP_Message_Lite::init (CORBA::Octet, + CORBA::Octet) +{ + return; +} - // First convert the Pluggable type to the GIOP specific type. - // @@ Bala: this looks like a such a waste of time. There is no - // reason to have those 'generic' values if they will be transformed - // back an forth. It makes more sense to have several methods: - // - // write_request_header() - // write_reply_header() - // write_..._header() - // - // exposed through the Pluggable_Messaging interface. The - // write_protocol_header() should not be exposed through the generic - // interface, but it may be a private method used to implement the - // methods above.... Then you can use the GIOP types directly. - // - switch (t) +int +TAO_GIOP_Message_Lite::parse_header (void) +{ + // Get the read pointer + char *buf = this->message_state_.cdr.rd_ptr (); + + // @@ Bala: i added the following comment, does it make sense? + // In GIOPLite the version, byte order info, etc. are hardcoded, and + // not transmitted over the wire. + this->message_state_.byte_order = TAO_ENCAP_BYTE_ORDER; + this->message_state_.giop_version.major = TAO_DEF_GIOP_MAJOR; + this->message_state_.giop_version.minor = TAO_DEF_GIOP_MINOR; + + // Get the message type. + this->message_state_.message_type = buf[TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET]; + + this->message_state_.cdr.reset_byte_order (this->message_state_.byte_order); + + // The first bytes are the length of the message. + this->message_state_.cdr.read_ulong (this->message_state_.message_size); + + return 0; +} + + +void +TAO_GIOP_Message_Lite::reset (int reset_flag) +{ + // Reset the message state + this->message_state_.reset (reset_flag); + + //What else??? +} + + +int +TAO_GIOP_Message_Lite::generate_request_header ( + TAO_Operation_Details &op, + TAO_Target_Specification &spec, + TAO_OutputCDR &cdr + ) +{ + // Write the GIOPLite header first + if (!this->write_protocol_header (TAO_GIOP_REQUEST, + cdr)) { - case (TAO_PLUGGABLE_MESSAGE_REQUEST): - type = TAO_GIOP_REQUEST; - break; - case (TAO_PLUGGABLE_MESSAGE_REPLY): - type = TAO_GIOP_REPLY; - break; - case (TAO_PLUGGABLE_MESSAGE_CANCELREQUEST): - type = TAO_GIOP_CANCELREQUEST; - break; - case (TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST): - type = TAO_GIOP_LOCATEREQUEST; - break; - case (TAO_PLUGGABLE_MESSAGE_LOCATEREPLY): - type = TAO_GIOP_LOCATEREPLY; - break; - case (TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION): - type = TAO_GIOP_CLOSECONNECTION; - break; - case (TAO_PLUGGABLE_MESSAGE_MESSAGERROR): - type = TAO_GIOP_MESSAGERROR; - break; - case (TAO_PLUGGABLE_MESSAGE_FRAGMENT): - type = TAO_GIOP_FRAGMENT; - break; + if (TAO_debug_level > 3) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")), + -1); + } - // Write the message type. - msg.write_octet ((CORBA::Octet) type); - return 1; + // Now call the implementation for the rest of the header + if (!this->write_request_header (op, + spec, + cdr)) + { + if (TAO_debug_level > 4) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing request header \n")), + -1); + } + + return 0; + } int -TAO_GIOP_Message_Lite::handle_input (TAO_Transport *transport, - TAO_ORB_Core * /* orb_core */, - TAO_Message_State_Factory &mesg_state, - ACE_Time_Value *max_wait_time) +TAO_GIOP_Message_Lite::generate_locate_request_header ( + TAO_Operation_Details &op, + TAO_Target_Specification &spec, + TAO_OutputCDR &cdr + ) { - // @@ Bala: only the pluggable transport protocols can invoke the - // handle_input() method, but they must know what messaging protocol - // is in place already. In consequence this method should not be - // part of the public interface for Pluggable_Messaging.... + // Write the GIOPLite header first + if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST, + cdr)) + { + if (TAO_debug_level > 3) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOPLite header \n")), + -1); + + } + + // Now call the implementation for the rest of the header + if (!this->write_locate_request_header (op.request_id (), + spec, + cdr)) + { + if (TAO_debug_level > 4) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing locate request header \n")), + -1); + } - TAO_GIOP_Message_State *state = - ACE_dynamic_cast (TAO_GIOP_Message_State *, &mesg_state); + return 0; +} - // @@ Bala: can we eliminate fragments for GIOPLite? I think we can - // and then this code should be significantly simplified. +int +TAO_GIOP_Message_Lite::generate_reply_header ( + TAO_OutputCDR &cdr, + TAO_Pluggable_Reply_Params ¶ms + ) +{ + // Write the GIOPLite header first + if (!this->write_protocol_header (TAO_GIOP_REPLY, + cdr)) + { + if (TAO_debug_level > 3) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing GIOPLite header \n")), + -1); + } - if (state->header_received () == 0) + // Now call the implementation for the rest of the header + if (!this->write_reply_header (cdr, + params)) + { + if (TAO_debug_level > 4) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Error in writing reply header \n")), + -1); + } + + return 0; +} + + +int +TAO_GIOP_Message_Lite::read_message (TAO_Transport *transport, + int /*block */, + ACE_Time_Value *max_wait_time) +{ + if (this->message_state_.header_received () == 0) { int retval = TAO_GIOP_Utils::read_bytes_input (transport, - state->cdr, - TAO_GIOP_LITE_HEADER_LEN, + message_state_.cdr, + TAO_GIOP_LITE_HEADER_LEN , max_wait_time); - if (retval == -1 && TAO_debug_level > 0) + if (retval == -1) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - \n") - ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input \n"))); + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - \n") + ACE_TEXT ("TAO_GIOP_Message_Lite::read_message \n"))); + } + return -1; } - // After we read, we assume that everything is fine. We dont do - // any sanity check of the incoming header. - // Read the rest of the stuff. That should be read by the // corresponding states - if (this->parse_header (state) == -1) + if (this->parse_header () == -1) { if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t|%N%l) - %p\n" - "TAO_GIOP_Message_Lite::handle_input")); - } - + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N%l) -\n"), + ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input \n"))); return -1; } - if (state->cdr.grow (TAO_GIOP_LITE_HEADER_LEN + - state->message_size) == -1) + if (this->message_state_.cdr.grow (TAO_GIOP_LITE_HEADER_LEN + + this->message_state_.message_size) == -1) { if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N|%l) - %p\n"), - ACE_TEXT ("ACE_CDR::grow"))); - } - + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N|%l) - %p\n"), + ACE_TEXT ("ACE_CDR::grow"))); return -1; } // Growing the buffer may have reset the rd_ptr(), but we want // to leave it just after the GIOP header (that was parsed // already); - state->cdr.skip_bytes (TAO_GIOP_LITE_HEADER_LEN); + this->message_state_.cdr.skip_bytes (TAO_GIOP_LITE_HEADER_LEN); } size_t missing_data = - state->message_size - state->current_offset; + this->message_state_.message_size - this->message_state_.current_offset; ssize_t n = TAO_GIOP_Utils::read_buffer (transport, - state->cdr.rd_ptr () + state->current_offset, + this->message_state_.cdr.rd_ptr () + + this->message_state_.current_offset, missing_data, max_wait_time); - if (n == -1) + + if (n == -1) { if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, ") - ACE_TEXT ("read_buffer[1]"))); - } - + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, read_buffer[1] \n"))); return -1; } else if (n == 0) { if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, ") - ACE_TEXT ("read_buffer[2]"))); - } - + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, read_buffer[2]\n"))); return -1; } + this->message_state_.current_offset += n; - state->current_offset += n; - - if (state->current_offset == state->message_size) + if (this->message_state_.current_offset == + this->message_state_.message_size) { if (TAO_debug_level >= 4) { - size_t header_len = TAO_GIOP_LITE_HEADER_LEN; + size_t header_len = TAO_GIOP_LITE_HEADER_LEN ; - // Need to include GIOPlite too. - char *buf = state->cdr.rd_ptr (); + char *buf = this->message_state_.cdr.rd_ptr (); buf -= header_len; - size_t msg_len = state->cdr.length () + header_len; + size_t msg_len = this->message_state_.cdr.length () + header_len; this->dump_msg ("recv", ACE_reinterpret_cast (u_char *, buf), @@ -234,54 +299,15 @@ TAO_GIOP_Message_Lite::handle_input (TAO_Transport *transport, } } - return state->is_complete (); + return this->message_state_.is_complete (); } - -CORBA::Boolean -TAO_GIOP_Message_Lite::write_message_header ( - const TAO_Operation_Details &opdetails, - TAO_Pluggable_Header_Type header_type, - TAO_Target_Specification &spec, - TAO_OutputCDR &cdr - ) -{ - // @@ Bala: it is better to expose the methods directly, the - // higher level components in the ORB *know* if they want to send a - // request or a locate request. The switch is just a waste of time - // and breaks type safety.... - - switch (header_type) - { - case TAO_PLUGGABLE_MESSAGE_REQUEST_HEADER: - return this->write_request_header (opdetails, - spec, - cdr); - case TAO_PLUGGABLE_MESSAGE_LOCATE_REQUEST_HEADER: - return this->write_locate_request_header (opdetails.request_id (), - spec, - cdr); - default: - if (TAO_debug_level > 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t|%N|%l) Wrong header type \n")), - 0); - - } - - return 1; -} - int -TAO_GIOP_Message_Lite::send_message (TAO_Transport *transport, - TAO_OutputCDR &stream, - ACE_Time_Value *max_wait_time, - TAO_Stub *stub, - int two_way) +TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream) { // Get the header length - const size_t header_len = TAO_GIOP_LITE_HEADER_LEN; + const size_t header_len = TAO_GIOP_LITE_HEADER_LEN ; // Get the message size offset const size_t offset = TAO_GIOP_LITE_MESSAGE_SIZE_OFFSET; @@ -306,331 +332,214 @@ TAO_GIOP_Message_Lite::send_message (TAO_Transport *transport, *ACE_reinterpret_cast (CORBA::ULong *, buf + offset) = bodylen; #else if (!stream->do_byte_swap ()) - { - *ACE_reinterpret_cast (CORBA::ULong *, - buf + offset) = bodylen; - } + *ACE_reinterpret_cast (CORBA::ULong *, + buf + offset) = bodylen; else - { - ACE_CDR::swap_4 (ACE_reinterpret_cast (char *, - &bodylen), - buf + offset); - } + ACE_CDR::swap_4 (ACE_reinterpret_cast (char *, + &bodylen), + buf + offset); #endif /* ACE_ENABLE_SWAP_ON_WRITE */ - this->dump_msg ("send", - ACE_reinterpret_cast (u_char *, - buf), - stream.length ()); + // Strictly speaking, should not need to loop here because the + // socket never gets set to a nonblocking mode ... some Linux + // versions seem to need it though. Leaving it costs little. + if (TAO_debug_level > 2) + { + this->dump_msg ("send", + ACE_reinterpret_cast (u_char *, + buf), + stream.length ()); + } - return this->transport_message (transport, - stream, - two_way, - stub, - max_wait_time); + return 0; } -int -TAO_GIOP_Message_Lite::parse_reply (TAO_Message_State_Factory &mesg_state, - TAO_Pluggable_Reply_Params ¶ms) +TAO_Pluggable_Message_Type +TAO_GIOP_Message_Lite::message_type (void) { - // Cast to the GIOP Message state - TAO_GIOP_Message_State *state = - ACE_dynamic_cast (TAO_GIOP_Message_State *, - &mesg_state); - - switch (state->message_type) + switch (this->message_state_.message_type) { case TAO_GIOP_REQUEST: - // In GIOP 1.0 and GIOP 1.1 this is an error, - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) %N:%l TAO_GIOP_Message_Lite::parse_reply: ") - ACE_TEXT ("request.\n")), - -1); - - case TAO_GIOP_CANCELREQUEST: case TAO_GIOP_LOCATEREQUEST: - // Errors - case TAO_GIOP_CLOSECONNECTION: - default: - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) %N:%l parse_reply: ") - ACE_TEXT ("wrong message.\n")), - -1); + return TAO_PLUGGABLE_MESSAGE_REQUEST; + case TAO_GIOP_LOCATEREPLY: case TAO_GIOP_REPLY: - // Handle after the switch + return TAO_PLUGGABLE_MESSAGE_REPLY; + + case TAO_GIOP_CLOSECONNECTION: + return TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION; + + case TAO_GIOP_CANCELREQUEST: + case TAO_GIOP_MESSAGERROR: case TAO_GIOP_FRAGMENT: // Never happens: why?? - break; + default: + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ") + ACE_TEXT ("wrong message.\n"))); + return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; + } - // Read the request id - if (!state->cdr.read_ulong (params.request_id_)) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N|%l) : ") - ACE_TEXT ("TAO_GIOP_Message_Lite::parse_reply, ") - ACE_TEXT ("extracting request id"))); - } + // In case of some errors + return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; +} - return -1; - } - CORBA::ULong rep_stat = 0; +int +TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, + TAO_ORB_Core *orb_core) +{ + // Set the upcall thread + orb_core->leader_follower ().set_upcall_thread (); - // and the reply status type. status can be NO_EXCEPTION, - // SYSTEM_EXCEPTION, USER_EXCEPTION, LOCATION_FORWARD - // CAnnot handle LOCATION_FORWARD_PERM here - if (!state->cdr.read_ulong (rep_stat)) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N|%l) : ") - ACE_TEXT ("TAO_GIOP_Message_Lite::parse_reply, ") - ACE_TEXT ("extracting reply status\n"))); - } + // Reset the output CDR stream. + // @@@@Is it necessary here? + this->output_->reset (); - return -1; - } + // + // Take out all the information from the <message_state> and reset + // it so that nested upcall on the same transport can be handled. + // - // Pass the right Pluggable interface code to the transport layer - switch (rep_stat) - { - // Request completed successfully - case TAO_GIOP_NO_EXCEPTION: - params.reply_status_ = TAO_PLUGGABLE_MESSAGE_NO_EXCEPTION; - break; + // Notice that the message_state is only modified in one thread at a + // time because the reactor does not call handle_input() for the + // same Event_Handler in two threads at the same time. - // Request terminated with user exception - case TAO_GIOP_USER_EXCEPTION: - params.reply_status_ = TAO_PLUGGABLE_MESSAGE_USER_EXCEPTION; - break; - // Request terminated with system exception - case TAO_GIOP_SYSTEM_EXCEPTION: - params.reply_status_ = TAO_PLUGGABLE_MESSAGE_SYSTEM_EXCEPTION; - break; - // Reply is a location forward type - case TAO_GIOP_LOCATION_FORWARD: - params.reply_status_ = TAO_PLUGGABLE_MESSAGE_LOCATION_FORWARD; - break; - default: - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%N|%l) Unknown reply status \n"))); - } - } - return 0; -} + // Steal the input CDR from the message state. + TAO_InputCDR input_cdr (ACE_InputCDR::Transfer_Contents (this->message_state_.cdr), + orb_core); + // Send the message state for the service layer like FT to log the + // messages + // @@@ Needed for DOORS + // orb_core->services_log_msg_rcv (this->message_state_); -int -TAO_GIOP_Message_Lite::process_client_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &input, - CORBA::Octet message_type) -{ + // Reset the message state. Now, we are ready for the next nested + // upcall if any. + this->message_state_.reset (0); - switch (message_type) + // We know we have some request message. Check whether it is a + // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. + switch (this->message_state_.message_type) { case TAO_GIOP_REQUEST: // Should be taken care by the state specific invocations. They // could raise an exception or write things in the output CDR // stream - this->process_client_request (transport, + return this->process_request (transport, orb_core, - input); - break; + input_cdr); case TAO_GIOP_LOCATEREQUEST: - this->process_client_locate (transport, - orb_core, - input); - - break; - case TAO_GIOP_MESSAGERROR: - case TAO_GIOP_REPLY: - case TAO_GIOP_LOCATEREPLY: - case TAO_GIOP_CLOSECONNECTION: + return this->process_locate_request (transport, + orb_core, + input_cdr); default: - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Illegal message ") - ACE_TEXT ("received by server\n"))); - } - - return this->send_error (transport); + return -1; } - return 0; + return -1; } -// @@ Bala: if you use ACE_UNUSED_ARG *please* put it at the beginning -// of the function, where there is some hope that somebody will find -// it! -CORBA::Boolean -TAO_GIOP_Message_Lite::write_reply_header ( - TAO_OutputCDR &output, - TAO_Pluggable_Reply_Params &reply, - CORBA::Environment & /* ACE_TRY_ENV */ +int +TAO_GIOP_Message_Lite::process_reply_message ( + TAO_Pluggable_Reply_Params ¶ms ) - ACE_THROW_SPEC ((CORBA::SystemException)) { - // Write the GIOP Lite header first - this->write_protocol_header (TAO_PLUGGABLE_MESSAGE_REPLY, - output); - - // Write the request ID - output.write_ulong (reply.request_id_); - - // Write the reply status - switch (reply.reply_status_) + // We know we have some reply message. Check whether it is a + // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. + switch (this->message_state_.message_type) { - case TAO_PLUGGABLE_MESSAGE_NO_EXCEPTION: - output.write_ulong (TAO_GIOP_NO_EXCEPTION); - break; - case TAO_PLUGGABLE_MESSAGE_LOCATION_FORWARD: - output.write_ulong (TAO_GIOP_LOCATION_FORWARD); - break; - case TAO_PLUGGABLE_MESSAGE_SYSTEM_EXCEPTION: - output.write_ulong (TAO_GIOP_SYSTEM_EXCEPTION); - break; - case TAO_PLUGGABLE_MESSAGE_USER_EXCEPTION: - output.write_ulong (TAO_GIOP_USER_EXCEPTION); - break; + case TAO_GIOP_REPLY: + // Should be taken care by the state specific parsing + return this->parse_reply (this->message_state_.cdr, + params); + case TAO_GIOP_LOCATEREPLY: + // We call parse_reply () here because, the message format for + // the LOCATEREPLY & REPLY are same. + return this->parse_reply (this->message_state_.cdr, + params); default: - // Some other specifc exception - output.write_ulong (reply.reply_status_); - break; + return -1; } - - return 1; } -CORBA::Boolean -TAO_GIOP_Message_Lite::write_request_header ( - const TAO_Operation_Details &opdetails, - TAO_Target_Specification &spec, - TAO_OutputCDR &out_stream - ) +int +TAO_GIOP_Message_Lite::generate_exception_reply ( + TAO_OutputCDR &cdr, + TAO_Pluggable_Reply_Params ¶ms, + CORBA::Exception &x + ) { - out_stream << opdetails.request_id (); - - const CORBA::Octet response_flags = opdetails.response_flags (); - - // @@ (JP) Temporary hack until all of GIOP 1.2 is implemented. - if (response_flags == TAO_TWOWAY_RESPONSE_FLAG) - { - out_stream << CORBA::Any::from_octet (1); - } - // Sync scope - ignored by server if request is not oneway. - else if (response_flags == CORBA::Octet (TAO::SYNC_WITH_TRANSPORT) - || response_flags == CORBA::Octet (TAO::SYNC_NONE) - || response_flags == CORBA::Octet (TAO::SYNC_EAGER_BUFFERING) - || response_flags == CORBA::Octet (TAO::SYNC_DELAYED_BUFFERING)) - { - // No response required. - out_stream << CORBA::Any::from_octet (0); - } - else if (response_flags == CORBA::Octet (TAO::SYNC_WITH_SERVER)) - { - // Return before dispatching servant. We're also setting the high - // bit here. This is a temporary fix until the rest of GIOP 1.2 is - // implemented in TAO. - out_stream << CORBA::Any::from_octet (129); - } - else if (response_flags == CORBA::Octet (TAO::SYNC_WITH_TARGET)) - { - // Return after dispatching servant. - out_stream << CORBA::Any::from_octet (3); - } - else - { - // Until more flags are defined by the OMG. - return 0; - } - - // In this case we cannot recognise anything other than the Object - // key as the address disposition variable. But we do a sanity check - // anyway. - const TAO_ObjectKey *key = spec.object_key (); + // A new try/catch block, but if something goes wrong now we have no + // hope, just abort. + ACE_DECLARE_NEW_CORBA_ENV; - if (key != 0) + ACE_TRY { - // Put in the object key - out_stream << *key; + // Make the GIOP & reply header. They are version specific. + this->write_reply_header (cdr, + params); + x._tao_encode (cdr, ACE_TRY_ENV); + ACE_TRY_CHECK; } - else + ACE_CATCH (CORBA_Exception, ex) { - if (TAO_debug_level) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("(%N |%l) Unable to handle this request \n") - )); - } + // Now we know that while handling the error an other error + // happened -> no hope, close connection. - return 0; + // Close the handle. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ") + ACE_TEXT ("generate_exception_reply ()"))); + return -1; } + ACE_ENDTRY; + ACE_CHECK_RETURN (-1); - out_stream.write_string (opdetails.opname_len (), - opdetails.opname ()); - - return 1; + return 0; } -CORBA::Boolean -TAO_GIOP_Message_Lite::write_locate_request_header ( - CORBA::ULong request_id, - TAO_Target_Specification &spec, - TAO_OutputCDR &msg - ) -{ - msg << request_id; - // In this case we cannot recognise anything other than the Object - // key as the address disposition variable. But we do a sanity check - // anyway. - const TAO_ObjectKey *key = spec.object_key (); +int +TAO_GIOP_Message_Lite::write_protocol_header ( + TAO_GIOP_Message_Type t, + TAO_OutputCDR &msg) +{ + // Reset the message type + msg.reset (); - if (key) - { - // Put in the object key - msg << *key; - } - else - { - if (TAO_debug_level) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("(%N |%l) Unable to handle this request \n") - )); - } + // @@ Bala: this is something to think harder about: right now we + // leave the space to store the length, and later we set it, but the + // way we do it is CDR specific... Maybe the XXXStream classes + // should support a 'save your current position' method that returns + // a 'Mememto' (check the GoF book), later the CDR stream could be + // restored to that state, and the size written to it. + // @@ Then again, i don't know how would that work with fragments + // (eventually we may want TAO to generate fragments), or protocols + // based on chunking.... + // + // Write a dummy <size> later it is set to the right value... @@ + CORBA::ULong size = 0; + msg.write_ulong (size); - return 0; - } + msg.write_octet ((CORBA::Octet) t); return 1; } int -TAO_GIOP_Message_Lite::process_client_request ( - TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &input - ) +TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, + TAO_ORB_Core *orb_core, + TAO_InputCDR &cdr) { // This will extract the request header, set <response_required> // and <sync_with_server> as appropriate. TAO_ServerRequest request (this, - input, + cdr, *this->output_, transport, orb_core); @@ -649,11 +558,8 @@ TAO_GIOP_Message_Lite::process_client_request ( // Throw an exception if the if (parse_error != 0) - { - ACE_TRY_THROW (CORBA::MARSHAL (TAO_DEFAULT_MINOR_CODE, - CORBA::COMPLETED_NO)); - } - + ACE_TRY_THROW (CORBA::MARSHAL (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_NO)); request_id = request.request_id (); response_required = request.response_expected (); @@ -671,7 +577,7 @@ TAO_GIOP_Message_Lite::process_client_request ( if (!CORBA::is_nil (forward_to.in ())) { // We should forward to another object... - TAO_Pluggable_Reply_Params reply_params; + TAO_Pluggable_Reply_Params reply_params (orb_core); reply_params.request_id_ = request_id; reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD; reply_params.svc_ctx_.length (0); @@ -680,14 +586,13 @@ TAO_GIOP_Message_Lite::process_client_request ( // Request. (Important for RT CORBA). reply_params.service_context_notowned (&request.service_info ()); - // Make the GIOP header and Reply header. - this->write_reply_header (*this->output_, - reply_params); + // Make the GIOP header and Reply header + this->generate_reply_header (*this->output_, + reply_params); *this->output_ << forward_to.in (); - int result = this->send_message (transport, - *this->output_); + int result = transport->send_message (*this->output_); if (result == -1) { if (TAO_debug_level > 0) @@ -720,8 +625,8 @@ TAO_GIOP_Message_Lite::process_client_request ( if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: cannot ") - ACE_TEXT ("send exception\n"), + ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") + ACE_TEXT ("cannot send exception\n"), ACE_TEXT ("process_connector_request ()"))); } @@ -762,10 +667,13 @@ TAO_GIOP_Message_Lite::process_client_request ( if (response_required) { - CORBA::UNKNOWN exception - (CORBA::SystemException::_tao_minor_code - (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0), - CORBA::COMPLETED_MAYBE); + CORBA::UNKNOWN exception ( + CORBA::SystemException::_tao_minor_code ( + TAO_UNHANDLED_SERVER_CXX_EXCEPTION, + 0 + ), + CORBA::COMPLETED_MAYBE + ); result = this->send_reply_exception (transport, orb_core, @@ -777,8 +685,8 @@ TAO_GIOP_Message_Lite::process_client_request ( if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: cannot ") - ACE_TEXT ("send exception\n"), + ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ") + ACE_TEXT ("cannot send exception\n"), ACE_TEXT ("process_connector_request ()"))); } @@ -805,10 +713,11 @@ TAO_GIOP_Message_Lite::process_client_request ( return 0; } + int -TAO_GIOP_Message_Lite::process_client_locate (TAO_Transport *transport, - TAO_ORB_Core* orb_core, - TAO_InputCDR &input) +TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport, + TAO_ORB_Core* orb_core, + TAO_InputCDR &input) { // This will extract the request header, set <response_required> as // appropriate. @@ -817,7 +726,7 @@ TAO_GIOP_Message_Lite::process_client_locate (TAO_Transport *transport, TAO_GIOP_Locate_Status_Msg status_info; - // Defaulting + // Defaulting. status_info.status = TAO_GIOP_UNKNOWN_OBJECT; CORBA::Boolean response_required = 1; @@ -850,13 +759,13 @@ TAO_GIOP_Message_Lite::process_client_locate (TAO_Transport *transport, parse_error = 1; CORBA::ULong req_id = locate_request.request_id (); - // We will send the reply and let not the ServerRequest class - // send the reply - CORBA::Boolean deferred_flag = 1; + // We will send the reply. The ServerRequest class need not send + // the reply + CORBA::Boolean deferred_reply = 1; TAO_ServerRequest server_request (this, req_id, response_required, - deferred_flag, + deferred_reply, tmp_key, "_non_existent", dummy_output, @@ -879,64 +788,49 @@ TAO_GIOP_Message_Lite::process_client_locate (TAO_Transport *transport, ACE_TRY_ENV); ACE_TRY_CHECK; - if (!CORBA::is_nil (forward_to.in ())) { status_info.status = TAO_GIOP_OBJECT_FORWARD; status_info.forward_location_var = forward_to; - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("handle_locate has been called: forwarding\n") - )); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("handle_locate has been called: forwarding\n"))); } else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION) { // We got no exception, so the object is here. status_info.status = TAO_GIOP_OBJECT_HERE; - if (TAO_debug_level > 0) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t) handle_locate() : found\n") - )); - } + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO: (%P|%t) handle_locate() : found\n"))); } else { - status_info.forward_location_var = - server_request.forward_location (); + status_info.forward_location_var = server_request.forward_location (); if (!CORBA::is_nil (status_info.forward_location_var.in ())) { status_info.status = TAO_GIOP_OBJECT_FORWARD; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("handle_locate has been called: ") - ACE_TEXT ("forwarding\n"))); + ACE_TEXT ("handle_locate has been called: forwarding\n"))); } else { // Normal exception, so the object is not here status_info.status = TAO_GIOP_UNKNOWN_OBJECT; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("handle_locate has been called: ") - ACE_TEXT ("not here\n"))); + ACE_TEXT ("handle_locate has been called: not here\n"))); } } } + ACE_CATCHANY { // Normal exception, so the object is not here status_info.status = TAO_GIOP_UNKNOWN_OBJECT; - if (TAO_debug_level > 0) - { - ACE_DEBUG (( - LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) TAO_GIOP::process_server_locate - ") - ACE_TEXT ("CORBA exception raised\n") - )); - } + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) TAO_GIOP::process_server_locate - ") + ACE_TEXT ("CORBA exception raised\n"))); } #if defined (TAO_HAS_EXCEPTIONS) ACE_CATCHALL @@ -944,24 +838,271 @@ TAO_GIOP_Message_Lite::process_client_locate (TAO_Transport *transport, // Normal exception, so the object is not here status_info.status = TAO_GIOP_UNKNOWN_OBJECT; if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) TAO_GIOP::process_server_locate - ") + ACE_TEXT ("C++ exception raised\n"))); + } +#endif /* TAO_HAS_EXCEPTIONS */ + ACE_ENDTRY; + + return this->make_send_locate_reply (transport, + locate_request, + status_info); +} + + +int +TAO_GIOP_Message_Lite::make_send_locate_reply ( + TAO_Transport *transport, + TAO_GIOP_Locate_Request_Header &request, + TAO_GIOP_Locate_Status_Msg &status_info + ) +{ + // Note here we are making the Locate reply header which is *QUITE* + // different from the reply header made by the make_reply () call.. + // Make the GIOP message header + this->write_protocol_header (TAO_GIOP_LOCATEREPLY, + *this->output_); + + // This writes the header & body + this->write_locate_reply_mesg (*this->output_, + request.request_id (), + status_info); + + // Send the message + int result = transport->send_message (*this->output_); + + // Print out message if there is an error + if (result == -1) + { + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"), + ACE_TEXT ("TAO_GIOP::process_server_message"))); + } + } + + return result; +} + +int +TAO_GIOP_Message_Lite::parse_reply (TAO_InputCDR &cdr, + TAO_Pluggable_Reply_Params ¶ms) +{ + + // Read the request id + if (!cdr.read_ulong (params.request_id_)) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N|%l) : ") + ACE_TEXT ("TAO_GIOP_Message_Lite::parse_reply, ") + ACE_TEXT ("extracting request id"))); + } + + return -1; + } + + CORBA::ULong rep_stat = 0; + + // and the reply status type. status can be NO_EXCEPTION, + // SYSTEM_EXCEPTION, USER_EXCEPTION, LOCATION_FORWARD + // CAnnot handle LOCATION_FORWARD_PERM here + if (!cdr.read_ulong (rep_stat)) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N|%l) : ") + ACE_TEXT ("TAO_GIOP_Message_Lite::parse_reply, ") + ACE_TEXT ("extracting reply status\n"))); + } + + return -1; + } + + // Pass the right Pluggable interface code to the transport layer + switch (rep_stat) + { + // Request completed successfully + case TAO_GIOP_NO_EXCEPTION: + params.reply_status_ = TAO_PLUGGABLE_MESSAGE_NO_EXCEPTION; + break; + + // Request terminated with user exception + case TAO_GIOP_USER_EXCEPTION: + params.reply_status_ = TAO_PLUGGABLE_MESSAGE_USER_EXCEPTION; + break; + // Request terminated with system exception + case TAO_GIOP_SYSTEM_EXCEPTION: + params.reply_status_ = TAO_PLUGGABLE_MESSAGE_SYSTEM_EXCEPTION; + break; + // Reply is a location forward type + case TAO_GIOP_LOCATION_FORWARD: + params.reply_status_ = TAO_PLUGGABLE_MESSAGE_LOCATION_FORWARD; + break; + default: + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%N|%l) Unknown reply status \n"))); + } + } + + return 0; +} + + + +int +TAO_GIOP_Message_Lite::write_reply_header ( + TAO_OutputCDR &output, + TAO_Pluggable_Reply_Params &reply, + CORBA::Environment & /* ACE_TRY_ENV */ + ) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // Write the GIOP Lite header first + this->write_protocol_header (TAO_GIOP_REPLY, + output); + + // Write the request ID + output.write_ulong (reply.request_id_); + + // Write the reply status + switch (reply.reply_status_) + { + case TAO_PLUGGABLE_MESSAGE_NO_EXCEPTION: + output.write_ulong (TAO_GIOP_NO_EXCEPTION); + break; + case TAO_PLUGGABLE_MESSAGE_LOCATION_FORWARD: + output.write_ulong (TAO_GIOP_LOCATION_FORWARD); + break; + case TAO_PLUGGABLE_MESSAGE_SYSTEM_EXCEPTION: + output.write_ulong (TAO_GIOP_SYSTEM_EXCEPTION); + break; + case TAO_PLUGGABLE_MESSAGE_USER_EXCEPTION: + output.write_ulong (TAO_GIOP_USER_EXCEPTION); + break; + default: + // Some other specifc exception + output.write_ulong (reply.reply_status_); + break; + } + + return 1; +} + +int +TAO_GIOP_Message_Lite::write_request_header ( + const TAO_Operation_Details &opdetails, + TAO_Target_Specification &spec, + TAO_OutputCDR &out_stream + ) +{ + out_stream << opdetails.request_id (); + + const CORBA::Octet response_flags = opdetails.response_flags (); + + // @@ (JP) Temporary hack until all of GIOP 1.2 is implemented. + if (response_flags == TAO_TWOWAY_RESPONSE_FLAG) + { + out_stream << CORBA::Any::from_octet (1); + } + // Sync scope - ignored by server if request is not oneway. + else if (response_flags == CORBA::Octet (TAO::SYNC_WITH_TRANSPORT) + || response_flags == CORBA::Octet (TAO::SYNC_NONE) + || response_flags == CORBA::Octet (TAO::SYNC_EAGER_BUFFERING) + || response_flags == CORBA::Octet (TAO::SYNC_DELAYED_BUFFERING)) + { + // No response required. + out_stream << CORBA::Any::from_octet (0); + } + else if (response_flags == CORBA::Octet (TAO::SYNC_WITH_SERVER)) + { + // Return before dispatching servant. We're also setting the high + // bit here. This is a temporary fix until the rest of GIOP 1.2 is + // implemented in TAO. + out_stream << CORBA::Any::from_octet (129); + } + else if (response_flags == CORBA::Octet (TAO::SYNC_WITH_TARGET)) + { + // Return after dispatching servant. + out_stream << CORBA::Any::from_octet (3); + } + else + { + // Until more flags are defined by the OMG. + return 0; + } + + // In this case we cannot recognise anything other than the Object + // key as the address disposition variable. But we do a sanity check + // anyway. + const TAO_ObjectKey *key = spec.object_key (); + + if (key != 0) + { + // Put in the object key + out_stream << *key; + } + else + { + if (TAO_debug_level) { ACE_DEBUG (( LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) TAO_GIOP::process_server_locate - ") - ACE_TEXT ("C++ exception raised\n") + ACE_TEXT ("(%N |%l) Unable to handle this request \n") )); } + + return 0; } -#endif /* TAO_HAS_EXCEPTIONS */ - ACE_ENDTRY; + out_stream.write_string (opdetails.opname_len (), + opdetails.opname ()); + + return 1; +} + +int +TAO_GIOP_Message_Lite::write_locate_request_header ( + CORBA::ULong request_id, + TAO_Target_Specification &spec, + TAO_OutputCDR &msg + ) +{ + msg << request_id; + + // In this case we cannot recognise anything other than the Object + // key as the address disposition variable. But we do a sanity check + // anyway. + const TAO_ObjectKey *key = spec.object_key (); + + if (key) + { + // Put in the object key + msg << *key; + } + else + { + if (TAO_debug_level) + { + ACE_DEBUG (( + LM_DEBUG, + ACE_TEXT ("(%N |%l) Unable to handle this request \n") + )); + } + + return 0; + } - return this->make_locate_reply (transport, - *this->output_, - locate_request, - status_info); + return 1; } + int TAO_GIOP_Message_Lite::parse_request_header (TAO_ServerRequest &request) { @@ -1089,7 +1230,7 @@ TAO_GIOP_Message_Lite::send_reply_exception ( orb_core->to_unicode ()); // Make the GIOP & reply header. They are version specific. - TAO_Pluggable_Reply_Params reply_params; + TAO_Pluggable_Reply_Params reply_params (orb_core); reply_params.request_id_ = request_id; reply_params.svc_ctx_.length (0); @@ -1133,16 +1274,14 @@ TAO_GIOP_Message_Lite::send_reply_exception ( } ACE_ENDTRY; - return this->send_message (transport, - output); + return transport->send_message (output); } int -TAO_GIOP_Message_Lite::make_locate_reply ( - TAO_Transport *transport, +TAO_GIOP_Message_Lite::write_locate_reply_mesg ( TAO_OutputCDR & output, - TAO_GIOP_Locate_Request_Header &request, + CORBA::ULong request_id, TAO_GIOP_Locate_Status_Msg &status_info ) { @@ -1150,11 +1289,11 @@ TAO_GIOP_Message_Lite::make_locate_reply ( // different from the reply header made by the make_reply () call.. // Make the GIOP message header - this->write_protocol_header (TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, + this->write_protocol_header (TAO_GIOP_LOCATEREPLY, output); // Make the header for the locate request - output.write_ulong (request.request_id ()); + output.write_ulong (request_id); output.write_ulong (status_info.status); if (status_info.status == TAO_GIOP_OBJECT_FORWARD) @@ -1170,22 +1309,7 @@ TAO_GIOP_Message_Lite::make_locate_reply ( } } - // Send the message - int result = this->send_message (transport, - output); - - // Print out message if there is an error - if (result == -1) - { - if (TAO_debug_level > 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"), - ACE_TEXT ("TAO_GIOP::process_server_message"))); - } - } - - return result; + return 1; } // Send an "I can't understand you" message -- again, the message is diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h index 71e0c7676da..921b4ad4278 100644 --- a/TAO/tao/GIOP_Message_Lite.h +++ b/TAO/tao/GIOP_Message_Lite.h @@ -1,26 +1,20 @@ // -*- C++ -*- -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// TAO -// -// = FILENAME -// GIOP_Message_Lite.h -// -// = DESCRIPTION -// Interface for the GIOP Lite messaging protocol -// -// = AUTHOR -// Balachandran Natarajan <bala@cs.wustl.edu> // -// ============================================================================ +// =================================================================== +/** + * @file GIOP_Message_Lite.h + * + * $Id$ + * + * @author Initially by Carlos <coryan@uci.edu> + * @author modified by Balachandran Natarajan <bala@cs.wustl.edu> + */ +// =================================================================== #ifndef TAO_GIOP_MESSAGE_LITE_H #define TAO_GIOP_MESSAGE_LITE_H #include "ace/pre.h" -#include "tao/CDR.h" +#include "tao/Pluggable_Messaging.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -29,166 +23,234 @@ #include "tao/GIOP_Utils.h" #include "tao/GIOP_Message_State.h" +#include "tao/CDR.h" -class TAO_ServerRequest; +class TAO_Operation_Details; +class TAO_Pluggable_Reply_Params; class TAO_GIOP_Locate_Request_Header; +/** + * @class TAO_GIOP_Message_Lite + * + * @brief Definitions of GIOPLite specific stuff + * + * This protocol is a modified version of GIOP. This is more suited + * for homogenous platforms. + */ class TAO_Export TAO_GIOP_Message_Lite : public TAO_Pluggable_Messaging { - // = TITLE - // Definitions got the GIOP lite - // - // = DESCRIPTION - // @@ Bala: please be precise when writing comments, and avoid - // colloquial expressions like 'quite a', you are writing - // technical documentation, not a novel... - // @@ Bala: wouldn't this be a better description of this class: - // Implement the GIOP lite pluggable messaging protocol. Very - // similar to the GIOP protocol, but several fields are removed to - // reduce overhead on homogenous systems. - // - // The interface is quite a replica of the GIOP Base - // interface. Implmenetation may not vary much too. But we are - // having a seperate interface to have seperation of concerns. - // public: + + /// Constructor TAO_GIOP_Message_Lite (TAO_ORB_Core *orb_core); - // Ctor + /// Dtor virtual ~TAO_GIOP_Message_Lite (void); - // Dtor - - virtual CORBA::Boolean write_protocol_header (TAO_Pluggable_Message_Type t, - TAO_OutputCDR &msg); - // Writes the GIOPLite header in to <msg> - - int handle_input (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_Message_State_Factory &mesg_state, - ACE_Time_Value *max_time_value = 0); - // Reads input from the transport layer to the cdr stream in <mesg_state> - - virtual CORBA::Boolean - write_message_header (const TAO_Operation_Details &opdetails, - TAO_Pluggable_Header_Type header_type, - TAO_Target_Specification &spec, - TAO_OutputCDR &msg); - // Write the header defined by <header_type> in to <msg> - - int send_message (TAO_Transport *transport, - TAO_OutputCDR &stream, - ACE_Time_Value *max_wait_time = 0, - TAO_Stub *stub = 0, - int two_way = 1); - // Sends the encapsulated stream in <stream> on to the transport - - int parse_reply (TAO_Message_State_Factory &mesg_state, - TAO_Pluggable_Reply_Params ¶ms); - // Parse the reply message from the server - int process_client_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &input, - CORBA::Octet message_type); - // Processes the messages from the connectors so that they can be - // passed on to the appropriate states. - - CORBA::Boolean write_reply_header (TAO_OutputCDR &cdr, - TAO_Pluggable_Reply_Params &reply, - CORBA::Environment &ACE_TRY_ENV = - TAO_default_environment ()) - ACE_THROW_SPEC ((CORBA::SystemException)); + /// Initialize the object -- this is a dummy for GIOPlite + virtual void init (CORBA::Octet, CORBA::Octet); + + /// Reset the messaging the object + virtual void reset (int reset_flag = 1); + + /// Write the RequestHeader in to the <cdr> stream. The underlying + /// implementation of the mesaging should do the right thing. + virtual int generate_request_header (TAO_Operation_Details &op, + TAO_Target_Specification &spec, + TAO_OutputCDR &cdr); + + /// Write the RequestHeader in to the <cdr> stream. + virtual int generate_locate_request_header ( + TAO_Operation_Details &op, + TAO_Target_Specification &spec, + TAO_OutputCDR &cdr); + + /// Write the reply header + virtual int generate_reply_header ( + TAO_OutputCDR &cdr, + TAO_Pluggable_Reply_Params ¶ms); + + /// This method reads the message on the connection. Returns 0 when + /// there is short read on the connection. Returns 1 when the full + /// message is read and handled. Returns -1 on errors. If <block> is + /// 1, then reply is read in a blocking manner. <bytes> indicates the + /// number of bytes that needs to be read from the connection. + /// GIOP uses this read to unmarshall the message details that appear + /// on the connection. + virtual int read_message (TAO_Transport *transport, + int block = 0, + ACE_Time_Value *max_wait_time = 0); + + + /// Format the message. As we have not written the message length in + /// the header, we make use of this oppurtunity to insert and format + /// the message. + virtual int format_message (TAO_OutputCDR &cdr); + + + /// Get the message type. The return value would be one of the + /// following: + /// TAO_PLUGGABLE_MESSAGE_REQUEST, + /// TAO_PLUGGABLE_MESSAGE_REPLY, + /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, + /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. + virtual TAO_Pluggable_Message_Type message_type (void); + + + + /// Process the request message that we have received on the + /// connection + virtual int process_request_message (TAO_Transport *transport, + TAO_ORB_Core *orb_core); + + /// Parse the reply message that we received and return the reply + /// information though <reply_info> + virtual int process_reply_message ( + TAO_Pluggable_Reply_Params &reply_info); + + /// Generate a reply message with the exception <ex>. + virtual int generate_exception_reply ( + TAO_OutputCDR &cdr, + TAO_Pluggable_Reply_Params ¶ms, + CORBA::Exception &x); private: - CORBA::Boolean - write_request_header (const TAO_Operation_Details &details, - TAO_Target_Specification &spec, - TAO_OutputCDR &msg); - // Write the GIOP lite request header in to <msg> - - CORBA::Boolean - write_locate_request_header (CORBA::ULong request_id, - TAO_Target_Specification &spec, - TAO_OutputCDR &msg); - // Write the GIOP lite locate request header in to <msg> - int parse_header (TAO_GIOP_Message_State *state); - // Parse the header + /// Writes the GIOP header in to <msg> + /// NOTE: If the GIOP header happens to change in the future, we can + /// push this method in to the generator_parser classes. + int write_protocol_header (TAO_GIOP_Message_Type t, + TAO_OutputCDR &msg); + /// Processes the <GIOP_REQUEST> messages + int process_request (TAO_Transport *transport, + TAO_ORB_Core *orb_core, + TAO_InputCDR &input); - int process_client_request (TAO_Transport *transport, - TAO_ORB_Core* orb_core, + /// Processes the <GIOP_LOCATE_REQUEST> messages + int process_locate_request (TAO_Transport *transport, + TAO_ORB_Core *orb_core, TAO_InputCDR &input); - // A request was received on the server side. <transport> is the - // source of the message (and thus where the replies should be - // sent). <orb_core> is the ORB that received the message <input> - // contains the message <output> can be used to store any responses - // <request_id> and <response_required> are set as part of the - // message processing. + /// Make a <GIOP_LOCATEREPLY> and hand that over to the transport so + /// that it can be sent over the connection. + /// NOTE:As on date 1.1 & 1.2 seem to have similar headers. Till an + /// unmanageable difference comes let them be implemented here. + int make_send_locate_reply (TAO_Transport *transport, + TAO_GIOP_Locate_Request_Header &request, + TAO_GIOP_Locate_Status_Msg &status); + + /// Send error messages + int send_error (TAO_Transport *transport); - int process_client_locate (TAO_Transport *transport, - TAO_ORB_Core* orb_core, - TAO_InputCDR &input); - // A LocateRequest was received on the server side. <transport> is - // the source of the message (and thus where the replies should be - // sent). <orb_core> is the ORB that received the message <input> - // contains the message <output> can be used to store any responses - // <request_id> and <response_required> are set as part of the - // message processing. + /// Parses the header of the GIOP messages for validity + int parse_header (void); - int parse_request_header (TAO_ServerRequest &request); - // Parse the Request Message header + /// Validates the first 4 bytes that contain the magic word + /// "GIOP". Also calls the validate_version () on the incoming + /// stream. + int parse_magic_bytes (void); - int parse_locate_header (TAO_GIOP_Locate_Request_Header &request); - // Parse the Locate Request header + /// This will do a validation of the stream that arrive in the + /// transport. + int validate_version (void); + /// Set the state + void set_state (CORBA::Octet major, + CORBA::Octet minor); + /// Close a connection, first sending GIOP::CloseConnection. + void send_close_connection (const TAO_GIOP_Version &version, + TAO_Transport *transport, + void *ctx); + + /// We must send a LocateReply through <transport>, this request + /// resulted in some kind of exception. int send_reply_exception (TAO_Transport *transport, TAO_ORB_Core* orb_core, CORBA::ULong request_id, IOP::ServiceContextList *svc_info, CORBA::Exception *x); - // We must send a LocateReply through <transport>, this request - // resulted in some kind of exception. - - int make_locate_reply (TAO_Transport *transport, - TAO_OutputCDR & output, - TAO_GIOP_Locate_Request_Header &request, - TAO_GIOP_Locate_Status_Msg &status_info); - // Making replies for LocateRequest messages. - - int send_error (TAO_Transport *transport); - // Send error messages + /// Print out a debug messages.. void dump_msg (const char *label, const u_char *ptr, size_t len); - // Print out the contents of the buffer. - ACE_Allocator *cdr_buffer_alloc_; - ACE_Allocator *cdr_dblock_alloc_; - // Allocators for the outpur CDR that we hold. As we cannot rely on - // the resources from ORB Core we reserve our own resources. The - // reason that we cannot believe the ORB core is that, for a - // multi-threaded servers it dishes out resources cached in - // TSS. This would be dangerous as TSS gets destroyed before we - // would. So we have our own memory that we can rely on. + /// Write the locate reply header + virtual int generate_locate_reply_header ( + TAO_OutputCDR & /*cdr*/, + TAO_Pluggable_Reply_Params & /*params*/) { return 0;}; + +private: + + /// Write the request header in to <msg> + int write_request_header ( + const TAO_Operation_Details &opdetails, + TAO_Target_Specification &spec, + TAO_OutputCDR &msg); + + /// Write the LocateRequest header + int write_locate_request_header ( + CORBA::ULong request_id, + TAO_Target_Specification &spec, + TAO_OutputCDR &msg); + + /// Write the reply header in to <output> + int write_reply_header ( + TAO_OutputCDR &output, + TAO_Pluggable_Reply_Params &reply, + CORBA::Environment &ACE_TRY_ENV = + TAO_default_environment ()) + ACE_THROW_SPEC ((CORBA::SystemException)); + + /// Writes the locate _reply message in to the <output> + int write_locate_reply_mesg ( + TAO_OutputCDR &output, + CORBA::ULong request_id, + TAO_GIOP_Locate_Status_Msg &status); + + /// Parse the Request Header from the incoming stream. This will do a + /// version specific parsing of the incoming Request header + int parse_request_header (TAO_ServerRequest &); + + /// Parse the Loacte Request Header from the incoming stream. This will do a + /// version specific parsing of the incoming Request header + int parse_locate_header ( + TAO_GIOP_Locate_Request_Header &); + /// Parse the reply message + int parse_reply (TAO_InputCDR &input, + TAO_Pluggable_Reply_Params ¶ms); + + /// Parse the locate reply message from the server + int parse_locate_reply (TAO_InputCDR &input, + TAO_Pluggable_Reply_Params ¶ms); + +private: + + /// The message state. It represents the status of the messages that + /// have been read from the connection. + TAO_GIOP_Message_State message_state_; + + /// Output CDR TAO_OutputCDR *output_; - // The output cdr for the GIOP lite message + /// Allocators for the output CDR that we hold. As we cannot rely on + /// the resources from ORB Core we reserve our own resources. The + /// reason that we cannot believe the ORB core is that, for a + /// multi-threaded servers it dishes out resources cached in + /// TSS. This would be dangerous as TSS gets destroyed before we + /// would. So we have our own memory that we can rely on. + /// Implementations of GIOP that we have + ACE_Allocator *cdr_buffer_alloc_; + ACE_Allocator *cdr_dblock_alloc_; + + /// A buffer that we will use to initialise the CDR stream char repbuf_[ACE_CDR::DEFAULT_BUFSIZE]; - // Char array to initialise our Output CDR class }; -// @@ Bala: Do you really need to define these in the header file? -// The IIOP Lite header length and the offset of the message size -// field in it. -const size_t TAO_GIOP_LITE_HEADER_LEN = 5; -const size_t TAO_GIOP_LITE_MESSAGE_SIZE_OFFSET = 0; -const size_t TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET = 4; diff --git a/TAO/tao/GIOP_Message_Lite.i b/TAO/tao/GIOP_Message_Lite.i index 7fbff4b3806..ca0908bbcf6 100644 --- a/TAO/tao/GIOP_Message_Lite.i +++ b/TAO/tao/GIOP_Message_Lite.i @@ -1,43 +1 @@ //$Id$ - - -ACE_INLINE -TAO_GIOP_Message_Lite::~TAO_GIOP_Message_Lite (void) -{ - // Explicitly call the destructor of the output CDR first. They need - // the allocators during destruction. - delete this->output_; - - // Then call the destructor of our allocators - if (this->cdr_dblock_alloc_ != 0) - this->cdr_dblock_alloc_->remove (); - // delete this->cdr_dblock_alloc_; - - if (this->cdr_buffer_alloc_ != 0) - this->cdr_buffer_alloc_->remove (); - // delete this->cdr_buffer_alloc_; -} - - -ACE_INLINE int -TAO_GIOP_Message_Lite::parse_header (TAO_GIOP_Message_State *state) -{ - // Get the read pointer - char *buf = state->cdr.rd_ptr (); - - // @@ Bala: i added the following comment, does it make sense? - // In GIOPLite the version, byte order info, etc. are hardcoded, and - // not transmitted over the wire. - state->byte_order = TAO_ENCAP_BYTE_ORDER; - state->giop_version.major = TAO_DEF_GIOP_MAJOR; - state->giop_version.minor = TAO_DEF_GIOP_MINOR; - - // Get the message type. - state->message_type = buf[TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET]; - - state->cdr.reset_byte_order (state->byte_order); - // The first bytes are the length of the message. - state->cdr.read_ulong (state->message_size); - - return 0; -} diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 9931e66b0c7..b265fd3ca07 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -107,24 +107,6 @@ TAO_IIOP_Connection_Handler::open (void*) else if (addr.addr_to_string (client, sizeof (client)) == -1) return -1; - // Construct an IIOP_Endpoint object - TAO_IIOP_Endpoint endpoint (addr, - 0); - - // Construct a property object - TAO_Base_Connection_Property prop (&endpoint); - - // Add the handler to Cache - if (this->orb_core ()->connection_cache ().cache_handler (&prop, - this) == -1) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) unable to cache the handle \n"))); - } - } - if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, @@ -169,9 +151,27 @@ TAO_IIOP_Connection_Handler::activate (long flags, } int +TAO_IIOP_Connection_Handler::svc (void) +{ + // This method is called when an instance is "activated", i.e., + // turned into an active object. Presumably, activation spawns a + // thread with this method as the "worker function". + + // Call the implementation here + return this->svc_i (); +} + + +int TAO_IIOP_Connection_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask rm) { + // @@ Alex: we need to figure out if the transport decides to close + // us or something else. If it is something else (for example + // the cached connector trying to make room for other + // connections) then we should let the transport know, so it can + // in turn take appropiate action (such as sending exceptions to + // all waiting reply handlers). if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) ") @@ -184,6 +184,9 @@ TAO_IIOP_Connection_Handler::handle_close (ACE_HANDLE handle, if (this->refcount_ == 0 && this->is_registered ()) { + // Make sure there are no timers. + this->reactor ()->cancel_timer (this); + // Set the flag to indicate that it is no longer registered with // the reactor, so that it isn't included in the set that is // passed to the reactor on ORB destruction. @@ -191,100 +194,8 @@ TAO_IIOP_Connection_Handler::handle_close (ACE_HANDLE handle, // Decrement the reference count this->decr_ref_count (); - - //return TAO_IIOP_SVC_HANDLER::handle_close (handle, rm); - } - - return 0; -} - -int -TAO_IIOP_Connection_Handler::svc (void) -{ - // This method is called when an instance is "activated", i.e., - // turned into an active object. Presumably, activation spawns a - // thread with this method as the "worker function". - - // Call the implementation here - return this->svc_i (); -} - -int -TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE h) -{ - return this->handle_input_i (h); -} - -int -TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) -{ - this->refcount_++; - - cout << "Getting called " <<endl; - // Call the transport read the message - int result = this->transport_.read_process_message (max_wait_time); - - // Now the message has been read - if (result == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("IIOP_Connection_Handler::read_message \n"))); - - } - - if (result == 0 || result == -1) - { - --this->refcount_; - if (this->refcount_ == 0) - this->decr_ref_count (); - - return result; } - - - // - // Take out all the information from the <message_state> and reset - // it so that nested upcall on the same transport can be handled. - // - - // Notice that the message_state is only modified in one thread at a - // time because the reactor does not call handle_input() for the - // same Event_Handler in two threads at the same time. - - // Copy message type. - /* TAO_GIOP_Message_State &ms = this->transport_.message_state_; - CORBA::Octet message_type = ms.message_type; - - // Copy version. - TAO_GIOP_Version giop_version = ms.giop_version; - - // Steal the input CDR from the message state. - TAO_InputCDR input_cdr (ACE_InputCDR::Transfer_Contents (ms.cdr), - this->orb_core ()); - - // Send the message state for the service layer like FT to log the - // messages - this->orb_core ()->services_log_msg_rcv (ms); - - // Reset the message state. - this->transport_.message_state_.reset (0); - result = - this->acceptor_factory_->process_client_message (this->transport (), - this->orb_core (), - input_cdr, - message_type); - - if (result != -1) - result = 0;*/ - - --this->refcount_; - if (this->refcount_ == 0) - this->decr_ref_count (); - //this->TAO_IIOP_SVC_HANDLER::handle_close (); - return 0; } @@ -294,28 +205,10 @@ TAO_IIOP_Connection_Handler::fetch_handle (void) return this->get_handle (); } -int -TAO_IIOP_Connection_Handler::close (u_long) -{ - this->destroy (); - - return 0; -} - - -/*int -TAO_IIOP_Client_Connection_Handler::handle_input (ACE_HANDLE) -{ - int r = this->transport ()->handle_client_input (); - if (r == -1) - return -1; - return 0; -} -*/ int TAO_IIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, - const void *) + const void *) { // This method is called when buffering timer expires. // @@ -333,65 +226,82 @@ TAO_IIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, return 0; } -/*int -TAO_IIOP_Client_Connection_Handler::handle_close (ACE_HANDLE handle, - ACE_Reactor_Mask rm) -{ - // @@ Alex: we need to figure out if the transport decides to close - // us or something else. If it is something else (for example - // the cached connector trying to make room for other - // connections) then we should let the transport know, so it can - // in turn take appropiate action (such as sending exceptions to - // all waiting reply handlers). - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) IIOP_Client_Connection_Handler::") - ACE_TEXT ("handle_close (%d, %d)\n"), handle, rm)); +int +TAO_IIOP_Connection_Handler::close (u_long) +{ + this->destroy (); - // Deregister this handler with the ACE_Reactor. - return this->handle_cleanup (); + return 0; } + int -TAO_IIOP_Client_Connection_Handler::handle_close_i (ACE_HANDLE handle, - ACE_Reactor_Mask rm) +TAO_IIOP_Connection_Handler::add_handler_to_cache (void) { - // @@ Alex: we need to figure out if the transport decides to close - // us or something else. If it is something else (for example - // the cached connector trying to make room for other - // connections) then we should let the transport know, so it can - // in turn take appropiate action (such as sending exceptions to - // all waiting reply handlers). + ACE_INET_Addr addr; - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) IIOP_Client_Connection_Handler::") - ACE_TEXT ("handle_close_i (%d, %d)\n"), handle, rm)); + // Get the peername. + if (this->peer ().get_remote_addr (addr) == -1) + return -1; + + // Construct an IIOP_Endpoint object + TAO_IIOP_Endpoint endpoint (addr, + 0); + + // Construct a property object + TAO_Base_Connection_Property prop (&endpoint); + + // Add the handler to Cache + return this->orb_core ()->connection_cache ().cache_handler (&prop, + this); +} - // Deregister this handler with the ACE_Reactor. - return this->handle_cleanup (); + +int +TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE h) +{ + return this->handle_input_i (h); } -*/ + int -TAO_IIOP_Connection_Handler::handle_cleanup (void) +TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE, + ACE_Time_Value *max_wait_time) { - // Call the implementation. - if (this->reactor ()) + this->refcount_++; + + // Call the transport read the message + int result = this->transport_.read_process_message (max_wait_time); + + // Now the message has been read + if (result == -1 && TAO_debug_level > 0) { - // Make sure there are no timers. - this->reactor ()->cancel_timer (this); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("IIOP_Connection_Handler::read_message \n"))); + } - // Now do the decrement of the ref count - this->decr_ref_count (); + // The upcall is done. Bump down the reference count + --this->refcount_; + if (this->refcount_ == 0) + this->decr_ref_count (); + + if (result == 0 || result == -1) + { + return result; + } return 0; } + + + + // **************************************************************** #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/tao/IIOP_Connection_Handler.h b/TAO/tao/IIOP_Connection_Handler.h index a72439326a7..aee04e27cc0 100644 --- a/TAO/tao/IIOP_Connection_Handler.h +++ b/TAO/tao/IIOP_Connection_Handler.h @@ -32,27 +32,36 @@ class TAO_Pluggable_Messaging; // **************************************************************** +/** + * @class TAO_IIOP_Properties + * + * @brief TCP protocol properties specification for a set of + * connections. + * + */ + class TAO_Export TAO_IIOP_Properties { - // = TITLE - // TCP protocol properties specification for a set of - // connections. - // + public: int send_buffer_size; int recv_buffer_size; int no_delay; }; + +// **************************************************************** + /** * @class TAO_IIOP_Connection_Handler * * @brief Handles requests on a single connection. * - * The handler is common for both the Acceptor and the Connector sides. + * The Connection handler which is common for the Acceptor and + * the Connector */ -// **************************************************************** + class TAO_Export TAO_IIOP_Connection_Handler : public TAO_IIOP_SVC_HANDLER, public TAO_Connection_Handler { @@ -109,6 +118,9 @@ public: /// Object termination hook. virtual int close (u_long flags = 0); + /// Add ourselves to Cache. + int add_handler_to_cache (void); + protected: /// = Event Handler overloads @@ -123,14 +135,6 @@ protected: virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Time_Value *max_wait_time = 0); - /// Perform appropriate closing but without grabbing any locks. - virtual int handle_close_i ( - ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::NULL_MASK) {return 1;} - - /// This method deregisters the handler from the reactor and closes it. - int handle_cleanup (void); - private: /// Transport object reference. @@ -147,162 +151,6 @@ private: }; -//ass TAO_Export TAO_IIOP_Server_Connection_Handler : public TAO_IIOP_SVC_HANDLER, -// public TAO_Connection_Handler -// -//// = TITLE -//// Handles requests on a single connection in a server. -// -//blic: -//TAO_IIOP_Server_Connection_Handler (ACE_Thread_Manager* t = 0); -//TAO_IIOP_Server_Connection_Handler (TAO_ORB_Core *orb_core, -// CORBA::Boolean flag, -// void *arg); -//// Constructor. <arg> parameter is used by the Acceptor to pass the -//// protocol configuration properties for this connection. -// -//~TAO_IIOP_Server_Connection_Handler (void); -//// Destructor. -// -//virtual int open (void *); -//// Called by the <Strategy_Acceptor> when the handler is completely -//// connected. Argument is unused. -// -//// = Active object activation method. -//virtual int activate (long flags = THR_NEW_LWP, -// int n_threads = 1, -// int force_active = 0, -// long priority = ACE_DEFAULT_THREAD_PRIORITY, -// int grp_id = -1, -// ACE_Task_Base *task = 0, -// ACE_hthread_t thread_handles[] = 0, -// void *stack[] = 0, -// size_t stack_size[] = 0, -// ACE_thread_t thread_names[] = 0); -// -//virtual int svc (void); -//// Only used when the handler is turned into an active object by -//// calling <activate>. This serves as the event loop in such cases. -// -//// = Template Methods Called by <handle_input> -// -//TAO_Transport *transport (void); -// -//virtual ACE_HANDLE fetch_handle (void); -//// Return the underlying handle -// -//virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, -// ACE_Reactor_Mask = ACE_Event_Handler::NULL_MASK); -//// Perform appropriate closing. -//otected: -// -//// = Event Handler overloads -// -//virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); -//virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, -// ACE_Time_Value *max_wait_time = 0); -//// Reads a message from the <peer()>, dispatching and servicing it -//// appropriately. -//// handle_input() just delegates on handle_input_i() which timeouts -//// after <max_wait_time>, this is used in thread-per-connection to -//// ensure that server threads eventually exit. -// -//otected: -// -//TAO_IIOP_Server_Transport transport_; -//// @@ New transport object reference. -// -//TAO_Pluggable_Messaging *acceptor_factory_; -//// Messaging acceptor factory -// -//u_long refcount_; -//// Reference count.It is used to count nested upcalls on this -//// svc_handler i.e., the connection can close during nested upcalls, -//// you should not delete the svc_handler until the stack unwinds -//// from the nested upcalls. -// -//TAO_IIOP_Properties *tcp_properties_; -//// TCP configuration for this connection. -// -// -//************************************************************************/ -//ass TAO_Export TAO_IIOP_Client_Connection_Handler: public TAO_IIOP_SVC_HANDLER, -// public TAO_Connection_Handler -// -//// = TITLE -//// <Svc_Handler> used on the client side and returned by the -//// <TAO_CONNECTOR>. -//blic: -//// = Intialization method. -// -//TAO_IIOP_Client_Connection_Handler (ACE_Thread_Manager* t = 0); -//// This constructor should *never* get called, it is just here to -//// make the compiler happy: the default implementation of the -//// Creation_Strategy requires a constructor with that signature, we -//// don't use that implementation, but some (most?) compilers -//// instantiate it anyway. -// -//TAO_IIOP_Client_Connection_Handler (ACE_Thread_Manager *t, -// TAO_ORB_Core* orb_core, -// CORBA::Boolean flag, -// void *arg); -//// Constructor. <arg> parameter is used by the Connector to pass the -//// protocol configuration properties for this connection. -// -//virtual ~TAO_IIOP_Client_Connection_Handler (void); -// -//// = <Connector> hook. -//virtual int open (void *); -//// Activation template method. -// -//// = Event Handler overloads -// -//virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); -//// Called when a response from a twoway invocation is available. -// -// -//virtual int handle_timeout (const ACE_Time_Value &tv, -// const void *arg = 0); -//// Called when buffering timer expires. -// -//virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, -// ACE_Reactor_Mask = ACE_Event_Handler::NULL_MASK); -//// Perform appropriate closing. -// -//virtual int handle_close_i (ACE_HANDLE = ACE_INVALID_HANDLE, -// ACE_Reactor_Mask = ACE_Event_Handler::NULL_MASK); -//// Perform appropriate closing but without grabbing any locks. -// -//virtual int close (u_long flags = 0); -//// Object termination hook. -// -//virtual TAO_Transport *transport (void); -//// Return the transport objects -// -//virtual ACE_HANDLE fetch_handle (void); -//// Return the underlying handle -// -//otected: -// -//int handle_cleanup (void); -//// This method deregisters the handler from the reactor and closes it. -// -//TAO_IIOP_Client_Transport transport_; -//// Reference to the transport object, it is owned by this class. -// -//TAO_IIOP_Properties *tcp_properties_; -//// TCP configuration for this connection. -// -//ivate: -// -//virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, -// ACE_Time_Value *max_wait_time = 0); -//// Will not be called at all. As a matter of fact should not be -//// called. This is just to override the pure virtual function in the -//// TAO_Connection_Handler class -// - - #if defined (__ACE_INLINE__) #include "tao/IIOP_Connection_Handler.i" #endif /* __ACE_INLINE__ */ diff --git a/TAO/tao/IIOP_Connection_Handler.i b/TAO/tao/IIOP_Connection_Handler.i index 941eb12b392..80f480a3663 100644 --- a/TAO/tao/IIOP_Connection_Handler.i +++ b/TAO/tao/IIOP_Connection_Handler.i @@ -6,9 +6,3 @@ TAO_IIOP_Connection_Handler::transport (void) { return &(this->transport_); } - -//ACE_INLINE TAO_Transport * -//TAO_IIOP_Client_Connection_Handler::transport (void) -//{ -// return &(this->transport_); -//} diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 6f9810d5efe..a015dd462da 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -13,8 +13,8 @@ #include "tao/ORB_Core.h" #include "tao/debug.h" #include "tao/GIOP_Message_Base.h" +#include "tao/GIOP_Message_Lite.h" -//#include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "tao/IIOP_Transport.i" @@ -31,14 +31,13 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, connection_handler_ (handler), messaging_object_ (0) { - /*if (flag) + if (flag) { // Use the lite version of the protocol ACE_NEW (this->messaging_object_, - GIOP_Message_Lite (orb_core)); + TAO_GIOP_Message_Lite (orb_core)); } - else*/ - ACE_UNUSED_ARG (flag); + else { // Use the normal GIOP object ACE_NEW (this->messaging_object_, @@ -146,8 +145,6 @@ int TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, int block) { - cout << "In IIOP_TRansport " <<endl; - // Read the message of the socket int result = this->messaging_object_->read_message (this, block, @@ -173,110 +170,6 @@ TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, } -// Return 0, when the reply is not read fully, 1 if it is read fully. -// @@ This code should go in the TAO_Transport class is repeated for -// each transport!! -// @@ Carlos says: no, the code should be factored out in GIOP helper -// classes, but not in Transport. Transport must deal with -// non-GIOP protocols, that may have completely different behavior. -// -int -TAO_IIOP_Transport::handle_client_input (int /* block */, - ACE_Time_Value * /*max_wait_time*/) -{ - - // Notice that the message_state is only modified in one thread at a - // time because the reactor does not call handle_input() for the - // same Event_Handler in two threads at the same time. - - // Get the message state from the Transport Mux Strategy. - /* TAO_GIOP_Message_State* message_state = - this->tms_->get_message_state (); - - if (message_state == 0) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) IIOP_Transport::handle_client_input -") - ACE_TEXT (" nil message state\n"))); - this->tms_->connection_closed (); - return -1; - } - - - int result = this->client_mesg_factory_->handle_input (this, - this->orb_core_, - *message_state, - max_wait_time); - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("IIOP_Transport::handle_client_input, handle_input"))); - - this->tms_->connection_closed (); - return -1; - } - if (result == 0) - return result; - - // OK, the complete message is here... - - result = this->client_mesg_factory_->parse_reply (*message_state, - this->params_); - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("IIOP_Transport::handle_client_input, parse reply"))); - message_state->reset (); - this->tms_->connection_closed (); - return -1; - } - - result = - this->tms_->dispatch_reply (this->params_.request_id_, - this->params_.reply_status_, - message_state->giop_version, - this->params_.svc_ctx_, - message_state); - - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : IIOP_Client_Transport::") - ACE_TEXT ("handle_client_input - ") - ACE_TEXT ("dispatch reply failed\n"))); - message_state->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - message_state->reset (); - return 0; - } - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - this->tms_->destroy_message_state (message_state); - */ - return 1; -} - - int TAO_IIOP_Transport::register_handler (void) { @@ -301,8 +194,6 @@ TAO_IIOP_Transport::register_handler (void) } - - int TAO_IIOP_Transport::send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, @@ -379,6 +270,8 @@ TAO_IIOP_Transport::start_request (TAO_ORB_Core * /*orb_core*/, { // TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_CLIENT_TRANSPORT_START_REQUEST_START); + // @@ This method should NO longer be required.. + /* if (this->client_mesg_factory_->write_protocol_header (TAO_PLUGGABLE_MESSAGE_REQUEST, output) == 0) @@ -394,13 +287,6 @@ TAO_IIOP_Transport::start_locate (TAO_ORB_Core * /*orb_core*/, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // See this is GIOP way of doing this..But anyway IIOP will be tied - // up with GIOP. - /* if (this->client_mesg_factory_->write_protocol_header - (TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, - output) == 0) - ACE_THROW (CORBA::MARSHAL ());*/ - if (this->messaging_object_->generate_locate_request_header (opdetails, spec, output) == -1) @@ -536,483 +422,3 @@ TAO_IIOP_Transport::process_message (void) this->messaging_object_->reset (); return 1; } - - -//// **************************************************************** -// -///*TAO_IIOP_Server_Transport:: -// TAO_IIOP_Server_Transport (TAO_IIOP_Server_Connection_Handler *handler, -// TAO_ORB_Core* orb_core) -// : TAO_IIOP_Transport (orb_core), -// message_state_ (orb_core), -// handler_ (handler) -//{ -//} -// -//TAO_IIOP_Server_Transport::~TAO_IIOP_Server_Transport (void) -//{ -//} -// -//int -//TAO_IIOP_Server_Transport::idle (void) -//{ -// return this->handler_->make_idle (); -//} -// -//TAO_IIOP_SVC_HANDLER * -//TAO_IIOP_Server_Transport::service_handler (void) -//{ -// return this->handler_; -//} -// -//void -//TAO_IIOP_Server_Transport::close_connection (void) -//{ -// // Purge the entry from the Cache map first and then close the -// // handler -// this->handler_->purge_entry (); -// -// // Now close the handler -// this->handler_->handle_close (); -//} -// -// -//TAO_IIOP_Client_Transport:: -// TAO_IIOP_Client_Transport (TAO_IIOP_Client_Connection_Handler *handler, -// TAO_ORB_Core *orb_core) -// : TAO_IIOP_Transport (orb_core), -// handler_ (handler), -// client_mesg_factory_ (0), -// orb_core_ (orb_core), -// lite_flag_ (0), -// params_ () -//{ -//} -// -//TAO_IIOP_Client_Transport::~TAO_IIOP_Client_Transport (void) -//{ -// delete this->client_mesg_factory_; -//} -// -//int -//TAO_IIOP_Client_Transport::idle (void) -//{ -// return this->handler_->make_idle (); -//} -// -// -//void -//TAO_IIOP_Client_Transport::start_request (TAO_ORB_Core * /*orb_core*/, -// TAO_Target_Specification & /*spec */, -// TAO_OutputCDR &output, -// CORBA::Environment &ACE_TRY_ENV) -// ACE_THROW_SPEC ((CORBA::SystemException)) -//{ -// TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_CLIENT_TRANSPORT_START_REQUEST_START); -// -// if (this->client_mesg_factory_->write_protocol_header -// (TAO_PLUGGABLE_MESSAGE_REQUEST, -// output) == 0) -// ACE_THROW (CORBA::MARSHAL ()); -//} -// -//void -//TAO_IIOP_Client_Transport::start_locate (TAO_ORB_Core * /*orb_core*/, -// TAO_Target_Specification &spec, -// TAO_Operation_Details &opdetails, -// TAO_OutputCDR &output, -// CORBA::Environment &ACE_TRY_ENV) -// ACE_THROW_SPEC ((CORBA::SystemException)) -//{ -// // See this is GIOP way of doing this..But anyway IIOP will be tied -// // up with GIOP. -// if (this->client_mesg_factory_->write_protocol_header -// (TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, -// output) == 0) -// ACE_THROW (CORBA::MARSHAL ()); -// -// if (this->client_mesg_factory_->write_message_header (opdetails, -// TAO_PLUGGABLE_MESSAGE_LOCATE_REQUEST_HEADER, -// spec, -// output) == 0) -// ACE_THROW (CORBA::MARSHAL ()); -//} -// -//int -//TAO_IIOP_Client_Transport::send_request (TAO_Stub *stub, -// TAO_ORB_Core *orb_core, -// TAO_OutputCDR &stream, -// int two_way, -// ACE_Time_Value *max_wait_time) -//{ -// if (this->ws_->sending_request (orb_core, -// two_way) == -1) -// return -1; -// -// if (this->client_mesg_factory_->send_message (this, -// stream, -// max_wait_time, -// stub, -// two_way) == -1) -// return -1; -// -// return this->idle_after_send (); -//} -// -//// Return 0, when the reply is not read fully, 1 if it is read fully. -//// @@ This code should go in the TAO_Transport class is repeated for -//// each transport!! -//// @@ Carlos says: no, the code should be factored out in GIOP helper -//// classes, but not in Transport. Transport must deal with -//// non-GIOP protocols, that may have completely different behavior. -//// -//int -//TAO_IIOP_Client_Transport::handle_client_input (int /* block */, -// ACE_Time_Value *max_wait_time) -//{ -// -// // Notice that the message_state is only modified in one thread at a -// // time because the reactor does not call handle_input() for the -// // same Event_Handler in two threads at the same time. -// -// // Get the message state from the Transport Mux Strategy. -// TAO_GIOP_Message_State* message_state = -// this->tms_->get_message_state (); -// -// if (message_state == 0) -// { -// if (TAO_debug_level > 0) -// ACE_DEBUG ((LM_DEBUG, -// ACE_TEXT ("TAO (%P|%t) IIOP_Transport::handle_client_input -") -// ACE_TEXT (" nil message state\n"))); -// this->tms_->connection_closed (); -// return -1; -// } -// -// -// int result = this->client_mesg_factory_->handle_input (this, -// this->orb_core_, -// *message_state, -// max_wait_time); -// if (result == -1) -// { -// if (TAO_debug_level > 0) -// ACE_DEBUG ((LM_DEBUG, -// ACE_TEXT ("TAO (%P|%t) - %p\n"), -// ACE_TEXT ("IIOP_Transport::handle_client_input, handle_input"))); -// -// this->tms_->connection_closed (); -// return -1; -// } -// if (result == 0) -// return result; -// -// // OK, the complete message is here... -// -// result = this->client_mesg_factory_->parse_reply (*message_state, -// this->params_); -// if (result == -1) -// { -// if (TAO_debug_level > 0) -// ACE_DEBUG ((LM_DEBUG, -// ACE_TEXT ("TAO (%P|%t) - %p\n"), -// ACE_TEXT ("IIOP_Transport::handle_client_input, parse reply"))); -// message_state->reset (); -// this->tms_->connection_closed (); -// return -1; -// } -// -// result = -// this->tms_->dispatch_reply (this->params_.request_id_, -// this->params_.reply_status_, -// message_state->giop_version, -// this->params_.svc_ctx_, -// message_state); -// -// // @@ Somehow it seems dangerous to reset the state *after* -// // dispatching the request, what if another threads receives -// // another reply in the same connection? -// // My guess is that it works as follows: -// // - For the exclusive case there can be no such thread. -// // - The the muxed case each thread has its own message_state. -// // I'm pretty sure this comment is right. Could somebody else -// // please look at it and confirm my guess? -// if (result == -1) -// { -// if (TAO_debug_level > 0) -// ACE_ERROR ((LM_ERROR, -// ACE_TEXT ("TAO (%P|%t) : IIOP_Client_Transport::") -// ACE_TEXT ("handle_client_input - ") -// ACE_TEXT ("dispatch reply failed\n"))); -// message_state->reset (); -// this->tms_->connection_closed (); -// return -1; -// } -// -// if (result == 0) -// { -// message_state->reset (); -// return 0; -// } -// -// // This is a NOOP for the Exclusive request case, but it actually -// // destroys the stream in the muxed case. -// this->tms_->destroy_message_state (message_state); -// -// return result; -//} -// -//int -//TAO_IIOP_Client_Transport::register_handler (void) -//{ -// // @@ It seems like this method should go away, the right reactor is -// // picked at object creation time. -// ACE_Reactor *r = this->orb_core ()->reactor (); -// if (r == this->service_handler ()->reactor ()) -// return 0; -// -// // About to be registered with the reactor, so bump the ref -// // count -// this->handler_->incr_ref_count (); -// -// // Set the flag in the Connection Handler -// this->handler_->is_registered (1); -// -// // Register the handler with the reactor -// return r->register_handler (this->handler_, -// ACE_Event_Handler::READ_MASK); -//} -// -// -//TAO_IIOP_SVC_HANDLER * -//TAO_IIOP_Client_Transport::service_handler (void) -//{ -// return this->handler_; -//} -// -//int -//TAO_IIOP_Client_Transport::messaging_init (CORBA::Octet major, -// CORBA::Octet minor) -//{ -// if (this->client_mesg_factory_ == 0) -// { -// if (this->lite_flag_) -// { -// ACE_NEW_RETURN (this->client_mesg_factory_, -// TAO_GIOP_Message_Lite (this->orb_core_), -// -1); -// } -// else if (major == TAO_DEF_GIOP_MAJOR) -// { -// if (minor > TAO_DEF_GIOP_MINOR) -// minor = TAO_DEF_GIOP_MINOR; -// switch (minor) -// { -// case 0: -// ACE_NEW_RETURN (this->client_mesg_factory_, -// TAO_GIOP_Message_Connector_10, -// 0); -// break; -// case 1: -// ACE_NEW_RETURN (this->client_mesg_factory_, -// TAO_GIOP_Message_Connector_11, -// 0); -// break; -// case 2: -// ACE_NEW_RETURN (this->client_mesg_factory_, -// TAO_GIOP_Message_Connector_12, -// 0); -// break; -// default: -// if (TAO_debug_level > 0) -// { -// ACE_ERROR_RETURN ((LM_ERROR, -// ACE_TEXT ("(%N|%l|%p|%t) No matching minor version number \n")), -// 0); -// } -// } -// } -// else -// { -// if (TAO_debug_level > 0) -// { -// ACE_ERROR_RETURN ((LM_ERROR, -// ACE_TEXT ("(%N|%l|%p|%t) No matching major version number \n")), -// 0); -// } -// } -// } -// -// return 1; -//} -// -//CORBA::Boolean -//TAO_IIOP_Client_Transport::send_request_header (TAO_Operation_Details &opdetails, -// TAO_Target_Specification &spec, -// TAO_OutputCDR & msg) -//{ -// // We are going to pass on this request to the underlying messaging -// // layer. It should take care of this request -// CORBA::Boolean retval = -// this->client_mesg_factory_->write_message_header (opdetails, -// TAO_PLUGGABLE_MESSAGE_REQUEST_HEADER, -// spec, -// msg); -// -// return retval; -//} -// -// -//void -//TAO_IIOP_Client_Transport::close_connection (void) -//{ -// // Purge the entry from the Cache map first and then close the -// // handler -// this->handler_->purge_entry (); -// -// // Now close the handler -// this->handler_->handle_close (); -//} -// -//// ********************************************************************* -// -//ssize_t -//TAO_IIOP_Transport::send (TAO_Stub *stub, -// int two_way, -// const ACE_Message_Block *message_block, -// const ACE_Time_Value *max_wait_time) -//{ -// if (stub == 0 || two_way) -// { -// return this->send (message_block, -// max_wait_time); -// } -// else -// { -// TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); -// -// return sync_strategy.send (*this, -// *stub, -// message_block, -// max_wait_time); -// } -//} -// -//ssize_t -//TAO_IIOP_Transport::send (const ACE_Message_Block *message_block, -// const ACE_Time_Value *max_wait_time, -// size_t *bytes_transferred) -//{ -// TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_SEND_START); -// -// return ACE::send_n (this->handle (), -// message_block, -// max_wait_time, -// bytes_transferred); -//} -// -//ssize_t -//TAO_IIOP_Transport::send (const u_char *buf, -// size_t len, -// const ACE_Time_Value *max_wait_time) -//{ -// TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_SEND_START); -// -// return this->service_handler ()->peer ().send_n (buf, -// len, -// max_wait_time); -//} -// -//ssize_t -//TAO_IIOP_Transport::recv (char *buf, -// size_t len, -// const ACE_Time_Value *max_wait_time) -//{ -// TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_RECEIVE_START); -// -// return this->service_handler ()->peer ().recv_n (buf, -// len, -// max_wait_time); -//} -// -//// Default action to be taken for send request. -//int -//TAO_IIOP_Transport::send_request (TAO_Stub *, -// TAO_ORB_Core * /* orb_core */, -// TAO_OutputCDR & /* stream */, -// int /* twoway */, -// ACE_Time_Value * /* max_wait_time */) -//{ -// return -1; -//} -// -// -// -//CORBA::Boolean -//TAO_IIOP_Transport::send_request_header (TAO_Operation_Details & /**/, -// TAO_Target_Specification & /*spec */ , -// TAO_OutputCDR & /*msg*/) -//{ -// // We should never be here. So return an error. -// return 0; -//} -//*/ - -//ssize_t -//TAO_IIOP_Transport::send (TAO_Stub *stub, -// int two_way, -// const ACE_Message_Block *message_block, -// const ACE_Time_Value *max_wait_time) -//{ -// if (stub == 0 || two_way) -// { -// return this->send (message_block, -// max_wait_time); -// } -// else -// { -// TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); -// -// return sync_strategy.send (*this, -// *stub, -// message_block, -// max_wait_time); -// } -//} -// -//ssize_t -//TAO_IIOP_Transport::send (const ACE_Message_Block *message_block, -// const ACE_Time_Value *max_wait_time, -// size_t *bytes_transferred) -//{ -// TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_SEND_START); -// -// return ACE::send_n (this->handle (), -// message_block, -// max_wait_time, -// bytes_transferred); -//} -// -//ssize_t -//TAO_IIOP_Transport::send (const u_char *buf, -// size_t len, -// const ACE_Time_Value *max_wait_time) -//{ -// TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_SEND_START); -// -// return this->service_handler ()->peer ().send_n (buf, -// len, -// max_wait_time); -//} -// -//ssize_t -//TAO_IIOP_Transport::recv (char *buf, -// size_t len, -// const ACE_Time_Value *max_wait_time) -//{ -// TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_RECEIVE_START); -// -// return this->service_handler ()->peer ().recv_n (buf, -// len, -// max_wait_time); -//} diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index ed5ef4234e2..d32f6426c90 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -47,11 +47,6 @@ typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> * */ -// @@ TODO: Looks like most of the Transport classes like -// TAO_IIOP_Transport, TAO_UIOP_Transport, TAO_YADA_Transport share -// quite some common code. Need to abstract them out. This could save -// us some foot print -- Bala - class TAO_Export TAO_IIOP_Transport : public TAO_Transport { public: @@ -105,12 +100,10 @@ public: virtual int read_process_message (ACE_Time_Value *max_time_value = 0, int block =0); - /// Do I need this -- Bala?? - virtual int handle_client_input (int block = 0, - ACE_Time_Value *max_time_value = 0); - virtual int register_handler (void); + /// @@TODO: These methods IMHO should have more meaningful + /// names. The names seem to indicate nothing. virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, diff --git a/TAO/tao/IIOP_Transport.i b/TAO/tao/IIOP_Transport.i index cce8f76fb53..81bf354f364 100644 --- a/TAO/tao/IIOP_Transport.i +++ b/TAO/tao/IIOP_Transport.i @@ -1,8 +1,2 @@ // -*- C++ -*- //$Id$ - -//ACE_INLINE void -//TAO_IIOP_Transport::use_lite (CORBA::Boolean /*flag */) -//{ - // this->lite_flag_ = flag; -//} diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp index e1ef866ccbe..4f5d50131f7 100644 --- a/TAO/tao/Pluggable.cpp +++ b/TAO/tao/Pluggable.cpp @@ -203,20 +203,6 @@ TAO_Transport::read_process_message (ACE_Time_Value * /* max_wait_time */, ACE_NOTSUP_RETURN (-1); } -// Read and handle the reply. Returns 0 when there is Short Read on -// the connection. Returns 1 when the full reply is read and -// handled. Returns -1 on errors. -// If <block> is 1, then reply is read in a blocking manner. - - - -int -TAO_Transport::handle_client_input (int /* block */, - ACE_Time_Value * /* max_wait_time */) -{ - ACE_NOTSUP_RETURN (-1); -} - int TAO_Transport::register_handler (void) { diff --git a/TAO/tao/Pluggable.h b/TAO/tao/Pluggable.h index 93419b025c5..a5f94b17f54 100644 --- a/TAO/tao/Pluggable.h +++ b/TAO/tao/Pluggable.h @@ -181,13 +181,6 @@ public: // been successfully read, the message is processed by delegating // the responsibility to the underlying messaging object. - virtual int handle_client_input (int block = 0, - ACE_Time_Value *max_wait_time = 0); - // Read and handle the reply. Returns 0 when there is Short Read on - // the connection. Returns 1 when the full reply is read and - // handled. Returns -1 on errors. - // If <block> is 1, then reply is read in a blocking manner. - virtual int register_handler (void); // Register the handler with the reactor. Will be called by the Wait // Strategy if Reactor is used for that strategy. Default diff --git a/TAO/tao/Strategies/SHMIOP_Acceptor.h b/TAO/tao/Strategies/SHMIOP_Acceptor.h index c05c2e94b59..4c8b6c27e74 100644 --- a/TAO/tao/Strategies/SHMIOP_Acceptor.h +++ b/TAO/tao/Strategies/SHMIOP_Acceptor.h @@ -30,11 +30,11 @@ #if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0) #include "tao/Pluggable.h" -#include "SHMIOP_Connect.h" +#include "SHMIOP_Connection_Handler.h" #include "tao/Acceptor_Impl.h" #include "ace/Acceptor.h" #include "ace/MEM_Acceptor.h" - +#include "tao/GIOP_Message_State.h" // TAO SHMIOP_Acceptor concrete call defination class TAO_Strategies_Export TAO_SHMIOP_Acceptor : public TAO_Acceptor @@ -52,10 +52,10 @@ public: ~TAO_SHMIOP_Acceptor (void); // Destructor. - typedef ACE_Strategy_Acceptor<TAO_SHMIOP_Server_Connection_Handler, ACE_MEM_ACCEPTOR> TAO_SHMIOP_BASE_ACCEPTOR; - typedef TAO_Creation_Strategy<TAO_SHMIOP_Server_Connection_Handler> TAO_SHMIOP_CREATION_STRATEGY; - typedef TAO_Concurrency_Strategy<TAO_SHMIOP_Server_Connection_Handler> TAO_SHMIOP_CONCURRENCY_STRATEGY; - typedef TAO_Accept_Strategy<TAO_SHMIOP_Server_Connection_Handler, ACE_MEM_ACCEPTOR> TAO_SHMIOP_ACCEPT_STRATEGY; + typedef ACE_Strategy_Acceptor<TAO_SHMIOP_Connection_Handler, ACE_MEM_ACCEPTOR> TAO_SHMIOP_BASE_ACCEPTOR; + typedef TAO_Creation_Strategy<TAO_SHMIOP_Connection_Handler> TAO_SHMIOP_CREATION_STRATEGY; + typedef TAO_Concurrency_Strategy<TAO_SHMIOP_Connection_Handler> TAO_SHMIOP_CONCURRENCY_STRATEGY; + typedef TAO_Accept_Strategy<TAO_SHMIOP_Connection_Handler, ACE_MEM_ACCEPTOR> TAO_SHMIOP_ACCEPT_STRATEGY; // = The TAO_Acceptor methods, check the documentation in // Pluggable.h for details. diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp new file mode 100644 index 00000000000..a510251e55c --- /dev/null +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp @@ -0,0 +1,314 @@ +// $Id$ + +#include "SHMIOP_Connection_Handler.h" + +#if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0) + +#include "tao/Timeprobe.h" +#include "tao/debug.h" +#include "tao/ORB_Core.h" +#include "tao/ORB.h" +#include "tao/CDR.h" +#include "tao/Messaging_Policy_i.h" +#include "tao/GIOP_Message_Base.h" +#include "tao/GIOP_Message_Lite.h" +#include "tao/Server_Strategy_Factory.h" +#include "tao/Base_Connection_Property.h" +#include "SHMIOP_Endpoint.h" + +#if !defined (__ACE_INLINE__) +# include "SHMIOP_Connection_Handler.inl" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(Strategies, SHMIOP_Connect, "$Id$") + + + +TAO_SHMIOP_Connection_Handler::TAO_SHMIOP_Connection_Handler (ACE_Thread_Manager *t) + : TAO_SHMIOP_SVC_HANDLER (t, 0 , 0), + TAO_Connection_Handler (0), + transport_ (this, 0, 0), + refcount_ (1) +{ + // This constructor should *never* get called, it is just here to + // make the compiler happy: the default implementation of the + // Creation_Strategy requires a constructor with that signature, we + // don't use that implementation, but some (most?) compilers + // instantiate it anyway. + ACE_ASSERT (this->orb_core () != 0); +} + + +TAO_SHMIOP_Connection_Handler::TAO_SHMIOP_Connection_Handler (TAO_ORB_Core *orb_core, + CORBA::Boolean flag, + void *) + : TAO_SHMIOP_SVC_HANDLER (orb_core->thr_mgr (), 0, 0), + TAO_Connection_Handler (orb_core), + transport_ (this, orb_core, flag), + refcount_ (1) +{ +} + + +TAO_SHMIOP_Connection_Handler::~TAO_SHMIOP_Connection_Handler (void) +{ + + // If the socket has not already been closed. + if (this->get_handle () != ACE_INVALID_HANDLE) + { + // Cannot deal with errors, and therefore they are ignored. + this->transport_.send_buffered_messages (); + } + else + { + // Dequeue messages and delete message blocks. + this->transport_.dequeue_all (); + } +} + + +// @@ Should I do something here to enable non-blocking?? (Alex). +// @@ Alex: I don't know if this is the place to do it, but the way to +// do it is: +// if (this->peer ().enable (ACE_NONBLOCK) == -1) +// return -1; +// Probably we will need to use the transport to decide if it is +// needed or not. + +int +TAO_SHMIOP_Connection_Handler::open (void*) +{ + if (this->set_socket_option (this->peer (), + this->orb_core ()->orb_params ()->sock_sndbuf_size (), + this->orb_core ()->orb_params ()->sock_rcvbuf_size ()) + == -1) + return -1; +#if !defined (ACE_LACKS_TCP_NODELAY) + + int nodelay = + this->orb_core ()->orb_params ()->nodelay (); + + if (this->peer ().set_option (ACE_IPPROTO_TCP, + TCP_NODELAY, + (void *) &nodelay, + sizeof (int)) == -1) + return -1; +#endif /* ! ACE_LACKS_TCP_NODELAY */ + + // Called by the <Strategy_Acceptor> when the handler is + // completely connected. + ACE_INET_Addr addr; + + char client[MAXHOSTNAMELEN + 16]; + + // Get the peername. + if (this->peer ().get_remote_addr (addr) == -1) + return -1; + + // Verify that we can resolve the peer hostname. + else if (addr.addr_to_string (client, sizeof (client)) == -1) + return -1; + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) SHMIOP connection from client") + ACE_TEXT ("<%s> on %d\n"), + client, this->peer ().get_handle ())); + } + + return 0; +} + +int +TAO_SHMIOP_Connection_Handler::activate (long flags, + int n_threads, + int force_active, + long priority, + int grp_id, + ACE_Task_Base *task, + ACE_hthread_t thread_handles[], + void *stack[], + size_t stack_size[], + ACE_thread_t thread_names[]) +{ + if (TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) SHMIOP_Connection_Handler::activate %d ") + ACE_TEXT ("threads, flags = %d\n"), + n_threads, + flags, + THR_BOUND)); + + return TAO_SHMIOP_SVC_HANDLER::activate (flags, + n_threads, + force_active, + priority, + grp_id, + task, + thread_handles, + stack, + stack_size, + thread_names); +} + +int +TAO_SHMIOP_Connection_Handler::svc (void) +{ + // This method is called when an instance is "activated", i.e., + // turned into an active object. Presumably, activation spawns a + // thread with this method as the "worker function". + + // Call the implementation here + return this->svc_i (); +} + + +int +TAO_SHMIOP_Connection_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask rm) +{ + // @@ Alex: we need to figure out if the transport decides to close + // us or something else. If it is something else (for example + // the cached connector trying to make room for other + // connections) then we should let the transport know, so it can + // in turn take appropiate action (such as sending exceptions to + // all waiting reply handlers). + if (TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) ") + ACE_TEXT ("SHMIOP_Connection_Handler::handle_close ") + ACE_TEXT ("(%d, %d)\n"), + handle, + rm)); + + --this->refcount_; + if (this->refcount_ == 0 && + this->is_registered ()) + { + // Make sure there are no timers. + this->reactor ()->cancel_timer (this); + + // Set the flag to indicate that it is no longer registered with + // the reactor, so that it isn't included in the set that is + // passed to the reactor on ORB destruction. + this->is_registered (0); + + // Decrement the reference count + this->decr_ref_count (); + } + + return 0; +} + +ACE_HANDLE +TAO_SHMIOP_Connection_Handler::fetch_handle (void) +{ + return this->get_handle (); +} + + +int +TAO_SHMIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // This method is called when buffering timer expires. + // + ACE_Time_Value *max_wait_time = 0; + + TAO_Stub *stub = 0; + int has_timeout; + this->orb_core ()->call_timeout_hook (stub, + has_timeout, + *max_wait_time); + + // Cannot deal with errors, and therefore they are ignored. + this->transport ()->send_buffered_messages (max_wait_time); + + return 0; +} + + +int +TAO_SHMIOP_Connection_Handler::close (u_long) +{ + this->destroy (); + + return 0; +} + + +int +TAO_SHMIOP_Connection_Handler::add_handler_to_cache (void) +{ + ACE_INET_Addr addr; + + // Get the peername. + if (this->peer ().get_remote_addr (addr) == -1) + return -1; + + // Construct an SHMIOP_Endpoint object + TAO_SHMIOP_Endpoint endpoint (addr, + 0); + + // Construct a property object + TAO_Base_Connection_Property prop (&endpoint); + + // Add the handler to Cache + return this->orb_core ()->connection_cache ().cache_handler (&prop, + this); +} + + +int +TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE h) +{ + return this->handle_input_i (h); +} + + +int +TAO_SHMIOP_Connection_Handler::handle_input_i (ACE_HANDLE, + ACE_Time_Value *max_wait_time) +{ + this->refcount_++; + + // Call the transport read the message + int result = this->transport_.read_process_message (max_wait_time); + + // Now the message has been read + if (result == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("SHMIOP_Connection_Handler::read_message \n"))); + + } + + // The upcall is done. Bump down the reference count + --this->refcount_; + if (this->refcount_ == 0) + this->decr_ref_count (); + + if (result == 0 || result == -1) + { + return result; + } + + return 0; +} + + + +// **************************************************************** +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class _Svc_Handler<ACE_MEM_STREAM, ACE_NULL_SYNCH>; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Svc_Handler<ACE_MEM_STREAM, ACE_NULL_SYNCH> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#endif /*(TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0) */ diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h new file mode 100644 index 00000000000..be17915c0bd --- /dev/null +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h @@ -0,0 +1,138 @@ +// This may look like C, but it's really -*- C++ -*- + +// =================================================================== +/** + * @file SHMIOP_Connection_Handler.h + * + * $Id$ + * + * @author Originally by Nanbor Wang <nanbor@cs.wustl.edu> as UIOP_Connect.h + * @author modified by Balachandran Natarajan <bala@cs.wustl.edu> + */ +// =================================================================== + +#ifndef TAO_SHMIOP_CONNECT_H +#define TAO_SHMIOP_CONNECT_H +#include "ace/pre.h" + +#include "tao/corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0) + +#include "ace/Reactor.h" +#include "ace/Acceptor.h" +#include "tao/corbafwd.h" +#include "tao/Wait_Strategy.h" +#include "tao/Connection_Handler.h" +#include "SHMIOP_Transport.h" + +// **************************************************************** +/** + * @class TAO_SHMIOP_Connection_Handler + * + * @brief Handles requests on a single connection. + * + * The Connection handler which is common for the Acceptor and + * the Connector + */ + + +class TAO_Strategies_Export TAO_SHMIOP_Connection_Handler : public TAO_SHMIOP_SVC_HANDLER, + public TAO_Connection_Handler +{ + +public: + + TAO_SHMIOP_Connection_Handler (ACE_Thread_Manager* t = 0); + + /// Constructor. <arg> parameter is used by the Acceptor to pass the + /// protocol configuration properties for this connection. + TAO_SHMIOP_Connection_Handler (TAO_ORB_Core *orb_core, + CORBA::Boolean flag, + void *arg); + + + /// Destructor. + ~TAO_SHMIOP_Connection_Handler (void); + + /// Called by the <Strategy_Acceptor> when the handler is completely + /// connected. Argument is unused. + virtual int open (void *); + + + /// = Active object activation method. + virtual int activate (long flags = THR_NEW_LWP, + int n_threads = 1, + int force_active = 0, + long priority = ACE_DEFAULT_THREAD_PRIORITY, + int grp_id = -1, + ACE_Task_Base *task = 0, + ACE_hthread_t thread_handles[] = 0, + void *stack[] = 0, + size_t stack_size[] = 0, + ACE_thread_t thread_names[] = 0); + + /// Only used when the handler is turned into an active object by + /// calling <activate>. This serves as the event loop in such cases. + virtual int svc (void); + + /// Perform appropriate closing. + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::NULL_MASK); + + /// Return the underlying transport object + TAO_Transport *transport (void); + + /// Return the underlying handle + virtual ACE_HANDLE fetch_handle (void); + + /// Called when buffering timer expires. + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg = 0); + + /// Object termination hook. + virtual int close (u_long flags = 0); + + /// Add ourselves to Cache. + int add_handler_to_cache (void); + +protected: + + /// = Event Handler overloads + + /// Reads a message from the <peer()>, dispatching and servicing it + /// appropriately. + /// handle_input() just delegates on handle_input_i() which timeouts + /// after <max_wait_time>, this is used in thread-per-connection to + /// ensure that server threads eventually exit. + + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); + virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Time_Value *max_wait_time = 0); + +private: + + /// Transport object reference. + TAO_SHMIOP_Transport transport_; + + /// Reference count.It is used to count nested upcalls on this + /// svc_handler i.e., the connection can close during nested upcalls, + /// you should not delete the svc_handler until the stack unwinds + /// from the nested upcalls. + u_long refcount_; +}; + + + +#if defined (__ACE_INLINE__) +#include "SHMIOP_Connection_Handler.inl" +#endif /* __ACE_INLINE__ */ + +#endif /* TAO_HAS_SHMIOP && TAO_HAS_SHMIOP != 0 */ + +#include "ace/post.h" +#endif /* TAO_SHMIOP_CONNECT_H */ diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.inl b/TAO/tao/Strategies/SHMIOP_Connection_Handler.inl new file mode 100644 index 00000000000..18aa83c3daf --- /dev/null +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.inl @@ -0,0 +1,8 @@ +// -*- C++ -*- +// $Id$ + +ACE_INLINE TAO_Transport * +TAO_SHMIOP_Connection_Handler::transport (void) +{ + return &(this->transport_); +} diff --git a/TAO/tao/Strategies/SHMIOP_Connector.cpp b/TAO/tao/Strategies/SHMIOP_Connector.cpp index e2bf964478a..ae5fe1b5071 100644 --- a/TAO/tao/Strategies/SHMIOP_Connector.cpp +++ b/TAO/tao/Strategies/SHMIOP_Connector.cpp @@ -15,37 +15,6 @@ ACE_RCSID(Strategies, SHMIOP_Connector, "$Id$") - -TAO_SHMIOP_Connect_Creation_Strategy:: - TAO_SHMIOP_Connect_Creation_Strategy (ACE_Thread_Manager* t, - TAO_ORB_Core *orb_core, - CORBA::Boolean flag) - : ACE_Creation_Strategy<TAO_SHMIOP_Client_Connection_Handler> (t), - orb_core_ (orb_core), - lite_flag_ (flag) -{ -} - -TAO_SHMIOP_Connect_Creation_Strategy:: - ~TAO_SHMIOP_Connect_Creation_Strategy (void) -{ -} - -int -TAO_SHMIOP_Connect_Creation_Strategy::make_svc_handler - (TAO_SHMIOP_Client_Connection_Handler *&sh) -{ - if (sh == 0) - ACE_NEW_RETURN (sh, - TAO_SHMIOP_Client_Connection_Handler - (this->orb_core_->thr_mgr (), - this->orb_core_), - -1); - return 0; -} - -// **************************************************************** - TAO_SHMIOP_Connector::TAO_SHMIOP_Connector (CORBA::Octet flag) : TAO_Connector (TAO_TAG_SHMEM_PROFILE), base_connector_ (), @@ -61,27 +30,36 @@ TAO_SHMIOP_Connector::~TAO_SHMIOP_Connector (void) int TAO_SHMIOP_Connector::open (TAO_ORB_Core *orb_core) { - this->orb_core (orb_core); + this->orb_core (orb_core); - TAO_SHMIOP_Connect_Creation_Strategy *connect_creation_strategy = 0; + /// Our connect creation strategy + TAO_SHMIOP_CONNECT_CREATION_STRATEGY *connect_creation_strategy = 0; ACE_NEW_RETURN (connect_creation_strategy, - TAO_SHMIOP_Connect_Creation_Strategy - (this->orb_core ()->thr_mgr (), - this->orb_core (), - this->lite_flag_), + TAO_SHMIOP_CONNECT_CREATION_STRATEGY + (orb_core->thr_mgr (), + orb_core, + 0, + this->lite_flag_), -1); + /// Our activation strategy + TAO_SHMIOP_CONNECT_CONCURRENCY_STRATEGY *concurrency_strategy = 0; + + ACE_NEW_RETURN (concurrency_strategy, + TAO_SHMIOP_CONNECT_CONCURRENCY_STRATEGY (orb_core), + -1); return this->base_connector_.open (this->orb_core ()->reactor (), connect_creation_strategy, &this->connect_strategy_, - &this->null_activation_strategy_); + concurrency_strategy); } int TAO_SHMIOP_Connector::close (void) { + delete this->base_connector_.concurrency_strategy (); delete this->base_connector_.creation_strategy (); return this->base_connector_.close (); } @@ -130,7 +108,7 @@ TAO_SHMIOP_Connector::connect (TAO_Connection_Descriptor_Interface *desc, } int result = 0; - TAO_SHMIOP_Client_Connection_Handler *svc_handler = 0; + TAO_SHMIOP_Connection_Handler *svc_handler = 0; TAO_Connection_Handler *conn_handler = 0; // Check the Cache first for connections @@ -144,7 +122,7 @@ TAO_SHMIOP_Connector::connect (TAO_Connection_Descriptor_Interface *desc, // We have found a connection and a handler svc_handler = - ACE_dynamic_cast (TAO_SHMIOP_Client_Connection_Handler *, + ACE_dynamic_cast (TAO_SHMIOP_Connection_Handler *, conn_handler); } else @@ -301,7 +279,7 @@ TAO_SHMIOP_Connector::preconnect (const char *preconnects) // array of eventual handlers. num_connections = dests.size (); ACE_INET_Addr *remote_addrs = 0; - TAO_SHMIOP_Client_Connection_Handler **handlers = 0; + TAO_SHMIOP_Connection_Handler **handlers = 0; char *failures = 0; ACE_NEW_RETURN (remote_addrs, @@ -311,10 +289,10 @@ TAO_SHMIOP_Connector::preconnect (const char *preconnects) ACE_Auto_Basic_Array_Ptr<ACE_INET_Addr> safe_remote_addrs (remote_addrs); ACE_NEW_RETURN (handlers, - TAO_SHMIOP_Client_Connection_Handler *[num_connections], + TAO_SHMIOP_Connection_Handler *[num_connections], -1); - ACE_Auto_Basic_Array_Ptr<TAO_SHMIOP_Client_Connection_Handler*> + ACE_Auto_Basic_Array_Ptr<TAO_SHMIOP_Connection_Handler*> safe_handlers (handlers); ACE_NEW_RETURN (failures, @@ -462,7 +440,7 @@ TAO_SHMIOP_Connector::object_key_delimiter (void) const #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Concurrency_Strategy<TAO_SHMIOP_Client_Connection_Handler>; +template class ACE_Concurrency_Strategy<TAO_SHMIOP_Connection_Handler>; template class ACE_Creation_Strategy<TAO_SHMIOP_Client_Connection_Handler>; template class ACE_Strategy_Connector<TAO_SHMIOP_Client_Connection_Handler, ACE_MEM_CONNECTOR>; template class ACE_Connect_Strategy<TAO_SHMIOP_Client_Connection_Handler, ACE_MEM_CONNECTOR>; diff --git a/TAO/tao/Strategies/SHMIOP_Connector.h b/TAO/tao/Strategies/SHMIOP_Connector.h index 54bc074e7dc..7cab3524167 100644 --- a/TAO/tao/Strategies/SHMIOP_Connector.h +++ b/TAO/tao/Strategies/SHMIOP_Connector.h @@ -32,39 +32,9 @@ #include "ace/Connector.h" #include "ace/MEM_Connector.h" #include "tao/Pluggable.h" -#include "SHMIOP_Connect.h" +#include "SHMIOP_Connection_Handler.h" #include "tao/Resource_Factory.h" - -// **************************************************************** - -class TAO_Strategies_Export TAO_SHMIOP_Connect_Creation_Strategy : public ACE_Creation_Strategy<TAO_SHMIOP_Client_Connection_Handler> -{ - // = TITLE - // Helper creation strategy - // - // = DESCRIPTION - // Creates SHMIOP_Client_Connection_Handler objects but satisfies - // the interface required by the - // ACE_Creation_Strategy<TAO_SHMIOP_Client_Connection_Handler> - // -public: - TAO_SHMIOP_Connect_Creation_Strategy (ACE_Thread_Manager * = 0, - TAO_ORB_Core* orb_core = 0, - CORBA::Boolean flag = 0); - - ~TAO_SHMIOP_Connect_Creation_Strategy (void); - // Default destructor - - virtual int make_svc_handler (TAO_SHMIOP_Client_Connection_Handler *&sh); - // Makes TAO_SHMIOP_Client_Connection_Handlers - -private: - TAO_ORB_Core* orb_core_; - // The ORB - - CORBA::Boolean lite_flag_; - // Are we using lite? -}; +#include "tao/Connector_Impl.h" // **************************************************************** @@ -110,35 +80,33 @@ protected: public: - typedef ACE_Concurrency_Strategy<TAO_SHMIOP_Client_Connection_Handler> - TAO_ACTIVATION_STRATEGY; + typedef TAO_Connect_Concurrency_Strategy<TAO_SHMIOP_Connection_Handler> + TAO_SHMIOP_CONNECT_CONCURRENCY_STRATEGY; + + typedef TAO_Connect_Creation_Strategy<TAO_SHMIOP_Connection_Handler> + TAO_SHMIOP_CONNECT_CREATION_STRATEGY; - typedef ACE_Connect_Strategy<TAO_SHMIOP_Client_Connection_Handler, + typedef ACE_Connect_Strategy<TAO_SHMIOP_Connection_Handler, ACE_MEM_CONNECTOR> - TAO_CONNECT_STRATEGY ; + TAO_SHMIOP_CONNECT_STRATEGY ; - typedef ACE_Strategy_Connector<TAO_SHMIOP_Client_Connection_Handler, + typedef ACE_Strategy_Connector<TAO_SHMIOP_Connection_Handler, ACE_MEM_CONNECTOR> TAO_SHMIOP_BASE_CONNECTOR; - private: + ACE_MEM_Addr address_; // local address - TAO_ACTIVATION_STRATEGY null_activation_strategy_; - // Our activation strategy - - TAO_CONNECT_STRATEGY connect_strategy_; - // Our connect strategy + /// Our connect strategy + TAO_SHMIOP_CONNECT_STRATEGY connect_strategy_; + /// The connector initiating connection requests for IIOP. TAO_SHMIOP_BASE_CONNECTOR base_connector_; - // The connector initiating connection requests for IIOP. - - TAO_SHMIOP_Connect_Creation_Strategy creation_strategy_; - // Our creation strategy + /// Are we using GIOP lite?? CORBA::Boolean lite_flag_; - // Do we need to use a GIOP_Lite for sending messages? + }; #endif /* TAO_HAS_SHMIOP && TAO_HAS_SHMIOP != 0 */ diff --git a/TAO/tao/Strategies/SHMIOP_Endpoint.cpp b/TAO/tao/Strategies/SHMIOP_Endpoint.cpp index 23fd0803076..ad826bc9b4c 100644 --- a/TAO/tao/Strategies/SHMIOP_Endpoint.cpp +++ b/TAO/tao/Strategies/SHMIOP_Endpoint.cpp @@ -6,7 +6,7 @@ #if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0) -#include "SHMIOP_Connect.h" +#include "SHMIOP_Connection_Handler.h" #include "tao/debug.h" ACE_RCSID(Strategies, SHMIOP_Endpoint, "$Id$") @@ -21,7 +21,6 @@ TAO_SHMIOP_Endpoint::TAO_SHMIOP_Endpoint (const ACE_MEM_Addr &addr, host_ (), port_ (0), object_addr_ (addr.get_remote_addr ()), - hint_ (0), next_ (0) { this->set (addr.get_remote_addr (), use_dotted_decimal_addresses); @@ -33,7 +32,6 @@ TAO_SHMIOP_Endpoint::TAO_SHMIOP_Endpoint (const ACE_INET_Addr &addr, host_ (), port_ (0), object_addr_ (addr), - hint_ (0), next_ (0) { this->set (addr, use_dotted_decimal_addresses); @@ -46,7 +44,6 @@ TAO_SHMIOP_Endpoint::TAO_SHMIOP_Endpoint (const char *host, host_ (), port_ (port), object_addr_ (addr), - hint_ (0), next_ (0) { if (host != 0) @@ -58,7 +55,6 @@ TAO_SHMIOP_Endpoint::TAO_SHMIOP_Endpoint (void) host_ (), port_ (0), object_addr_ (), - hint_ (0), next_ (0) { } @@ -70,7 +66,6 @@ TAO_SHMIOP_Endpoint::TAO_SHMIOP_Endpoint (const char *host, host_ (), port_ (port), object_addr_ (), - hint_ (0), next_ (0) { if (host != 0) @@ -144,8 +139,8 @@ TAO_SHMIOP_Endpoint::host (const char *h) void TAO_SHMIOP_Endpoint::reset_hint (void) { - if (this->hint_) - this->hint_->cleanup_hint ((void **) &this->hint_); + // if (this->hint_) + //this->hint_->cleanup_hint ((void **) &this->hint_); } TAO_Endpoint * diff --git a/TAO/tao/Strategies/SHMIOP_Endpoint.h b/TAO/tao/Strategies/SHMIOP_Endpoint.h index 89134d52264..f06b60d0d08 100644 --- a/TAO/tao/Strategies/SHMIOP_Endpoint.h +++ b/TAO/tao/Strategies/SHMIOP_Endpoint.h @@ -37,7 +37,7 @@ #include "ace/INET_Addr.h" #include "ace/MEM_Addr.h" -class TAO_SHMIOP_Client_Connection_Handler; + class TAO_Strategies_Export TAO_SHMIOP_Endpoint : public TAO_Endpoint { @@ -114,7 +114,7 @@ public: CORBA::UShort port (CORBA::UShort p); // Set the port number. - TAO_SHMIOP_Client_Connection_Handler *&hint (void); + // TAO_SHMIOP_Client_Connection_Handler *&hint (void); // Access to our <hint_>. private: @@ -132,7 +132,7 @@ private: // Cached instance of <ACE_INET_Addr> for use in making // invocations, etc. - TAO_SHMIOP_Client_Connection_Handler *hint_; + // TAO_SHMIOP_Client_Connection_Handler *hint_; // Hint indicating the last successfully used connection handler for // a connection established through this endpoint's acceptor. diff --git a/TAO/tao/Strategies/SHMIOP_Endpoint.i b/TAO/tao/Strategies/SHMIOP_Endpoint.i index 1de11903fae..499cc64ebe0 100644 --- a/TAO/tao/Strategies/SHMIOP_Endpoint.i +++ b/TAO/tao/Strategies/SHMIOP_Endpoint.i @@ -51,8 +51,8 @@ TAO_SHMIOP_Endpoint::port (CORBA::UShort p) return this->port_ = p; } -ACE_INLINE TAO_SHMIOP_Client_Connection_Handler *& +/*ACE_INLINE TAO_SHMIOP_Client_Connection_Handler *& TAO_SHMIOP_Endpoint::hint (void) { return this->hint_; -} +}*/ diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index 8bde95a7f30..518c49d7f73 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -7,7 +7,7 @@ ACE_RCSID (Strategies, SHMIOP_Transport, "$Id$") -#include "SHMIOP_Connect.h" +#include "SHMIOP_Connection_Handler.h" #include "SHMIOP_Profile.h" #include "tao/Timeprobe.h" #include "tao/CDR.h" @@ -19,461 +19,409 @@ ACE_RCSID (Strategies, SHMIOP_Transport, "$Id$") #include "tao/debug.h" #include "tao/GIOP_Message_Lite.h" -#include "tao/GIOP_Message_Connectors.h" +#include "tao/GIOP_Message_Base.h" #if !defined (__ACE_INLINE__) # include "SHMIOP_Transport.i" #endif /* ! __ACE_INLINE__ */ -#if defined (ACE_ENABLE_TIMEPROBES) -static const char *TAO_Transport_Timeprobe_Description[] = - { - "SHMIOP_Transport::send - start", - "SHMIOP_Transport::send - end", - - "SHMIOP_Transport::receive - start", - "SHMIOP_Transport::receive - end", - - "SHMIOP_Client_Transport::start_request - start", - "SHMIOP_Client_Transport::start_request - end" - }; - -enum - { - TAO_SHMIOP_TRANSPORT_SEND_START = 1200, - TAO_SHMIOP_TRANSPORT_SEND_END, - - TAO_SHMIOP_TRANSPORT_RECEIVE_START, - TAO_SHMIOP_TRANSPORT_RECEIVE_END, - - TAO_SHMIOP_CLIENT_TRANSPORT_START_REQUEST_START, - TAO_SHMIOP_CLIENT_TRANSPORT_START_REQUEST_END - }; - - -// Setup Timeprobes -ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Transport_Timeprobe_Description, - TAO_SHMIOP_TRANSPORT_SEND_START); - -#endif /* ACE_ENABLE_TIMEPROBES */ - -TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_ORB_Core *orb_core) +TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler, + TAO_ORB_Core *orb_core, + CORBA::Boolean flag) : TAO_Transport (TAO_TAG_SHMEM_PROFILE, - orb_core) + orb_core), + connection_handler_ (handler), + messaging_object_ (0) { + if (flag) + { + // Use the lite version of the protocol + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Lite (orb_core)); + } + else + { + // Use the normal GIOP object + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Base (orb_core)); + } } TAO_SHMIOP_Transport::~TAO_SHMIOP_Transport (void) { + delete this->messaging_object_; } - +TAO_SHMIOP_SVC_HANDLER * +TAO_SHMIOP_Transport::service_handler (void) +{ + return this->connection_handler_; +} ACE_HANDLE TAO_SHMIOP_Transport::handle (void) { - return this->service_handler ()->get_handle (); + return this->connection_handler_->get_handle (); } - ACE_Event_Handler * TAO_SHMIOP_Transport::event_handler (void) { - return this->service_handler (); + return this->connection_handler_; } -// **************************************************************** - -TAO_SHMIOP_Server_Transport:: - TAO_SHMIOP_Server_Transport (TAO_SHMIOP_Server_Connection_Handler *handler, - TAO_ORB_Core* orb_core) - : TAO_SHMIOP_Transport (orb_core), - message_state_ (orb_core), - handler_ (handler) +void +TAO_SHMIOP_Transport::close_connection (void) { -} + // Now close the handler + this->connection_handler_->handle_close (); -TAO_SHMIOP_Server_Transport::~TAO_SHMIOP_Server_Transport (void) -{ + // Purge the entry from the Cache map first and then close the + // handler + this->connection_handler_->purge_entry (); } int -TAO_SHMIOP_Server_Transport::idle (void) +TAO_SHMIOP_Transport::idle (void) { - return this->handler_->make_idle(); + return this->connection_handler_->make_idle (); } -TAO_SHMIOP_SVC_HANDLER * -TAO_SHMIOP_Server_Transport::service_handler (void) + +ssize_t +TAO_SHMIOP_Transport::send (TAO_Stub *stub, + int two_way, + const ACE_Message_Block *message_block, + const ACE_Time_Value *max_wait_time) { - return this->handler_; + if (stub == 0 || two_way) + { + return this->send (message_block, + max_wait_time); + } + else + { + TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); + + return sync_strategy.send (*this, + *stub, + message_block, + max_wait_time); + } } -void -TAO_SHMIOP_Server_Transport::close_connection (void) + +ssize_t +TAO_SHMIOP_Transport::send (const ACE_Message_Block *message_block, + const ACE_Time_Value *max_wait_time, + size_t *) { - this->handler_->purge_entry (); - this->service_handler ()->handle_close (); + return this->service_handler ()->peer ().send (message_block, + max_wait_time); } -// **************************************************************** - -TAO_SHMIOP_Client_Transport:: - TAO_SHMIOP_Client_Transport (TAO_SHMIOP_Client_Connection_Handler *handler, - TAO_ORB_Core *orb_core) - : TAO_SHMIOP_Transport (orb_core), - handler_ (handler), - client_mesg_factory_ (0), - orb_core_ (orb_core), - lite_flag_ (0), - params_ () +ssize_t +TAO_SHMIOP_Transport::send (const u_char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) { + return this->service_handler ()->peer ().send (buf, + len, + max_wait_time); } -TAO_SHMIOP_Client_Transport::~TAO_SHMIOP_Client_Transport (void) +ssize_t +TAO_SHMIOP_Transport::recv (char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) { - delete this->client_mesg_factory_; + return this->service_handler ()->peer ().recv (buf, + len, + max_wait_time); } + int -TAO_SHMIOP_Client_Transport::idle (void) +TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, + int block) { - return this->handler_->make_idle(); -} + // Read the message of the socket + int result = this->messaging_object_->read_message (this, + block, + max_wait_time); + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("SHMIOP_Transport::read_message, failure in read_message ()"))); -void -TAO_SHMIOP_Client_Transport::start_request (TAO_ORB_Core * /*orb_core*/, - TAO_Target_Specification & /*spec */, - TAO_OutputCDR &output, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException)) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_SHMIOP_CLIENT_TRANSPORT_START_REQUEST_START); + this->tms_->connection_closed (); + return -1; + } + if (result == 0) + return result; + + // Now we know that we have been able to read the complete message + // here.. + return this->process_message (); - if (this->client_mesg_factory_->write_protocol_header - (TAO_PLUGGABLE_MESSAGE_REQUEST, - output) == 0) - ACE_THROW (CORBA::MARSHAL ()); } -void -TAO_SHMIOP_Client_Transport::start_locate (TAO_ORB_Core * /*orb_core*/, - TAO_Target_Specification &spec, - TAO_Operation_Details &opdetails, - TAO_OutputCDR &output, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException)) + +int +TAO_SHMIOP_Transport::register_handler (void) { - // See this is GIOP way of doing this..But anyway SHMIOP will be tied - // up with GIOP. - if (this->client_mesg_factory_->write_protocol_header - (TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, - output) == 0) - ACE_THROW (CORBA::MARSHAL ()); + // @@ It seems like this method should go away, the right reactor is + // picked at object creation time. + ACE_Reactor *r = this->orb_core_->reactor (); - if (this->client_mesg_factory_->write_message_header (opdetails, - TAO_PLUGGABLE_MESSAGE_LOCATE_REQUEST_HEADER, - spec, - output) == 0) - ACE_THROW (CORBA::MARSHAL ()); + if (r == this->connection_handler_->reactor ()) + return 0; + + // About to be registered with the reactor, so bump the ref + // count + this->connection_handler_->incr_ref_count (); + + // Set the flag in the Connection Handler + this->connection_handler_->is_registered (1); + + + // Register the handler with the reactor + return r->register_handler (this->connection_handler_, + ACE_Event_Handler::READ_MASK); } + int -TAO_SHMIOP_Client_Transport::send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core, - TAO_OutputCDR &stream, - int two_way, - ACE_Time_Value *max_wait_time) +TAO_SHMIOP_Transport::send_request (TAO_Stub *stub, + TAO_ORB_Core *orb_core, + TAO_OutputCDR &stream, + int two_way, + ACE_Time_Value *max_wait_time) { if (this->ws_->sending_request (orb_core, two_way) == -1) return -1; - if (this->client_mesg_factory_->send_message (this, - stream, - max_wait_time, - stub, - two_way) == -1) + if (this->send_message (stream, + stub, + two_way, + max_wait_time) == -1) + return -1; + return this->idle_after_send (); } -// Return 0, when the reply is not read fully, 1 if it is read fully. -// @@ This code should go in the TAO_Transport class is repeated for -// each transport!! int -TAO_SHMIOP_Client_Transport::handle_client_input (int /* block */, - ACE_Time_Value *max_wait_time) +TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, + TAO_Stub *stub, + int twoway, + ACE_Time_Value *max_wait_time) { + // Format the message in the stream first + if (this->messaging_object_->format_message (stream) != 0) + return -1; - // Notice that the message_state is only modified in one thread at a - // time because the reactor does not call handle_input() for the - // same Event_Handler in two threads at the same time. + // Strictly speaking, should not need to loop here because the + // socket never gets set to a nonblocking mode ... some Linux + // versions seem to need it though. Leaving it costs little. - // Get the message state from the Transport Mux Strategy. - TAO_GIOP_Message_State* message_state = - this->tms_->get_message_state (); + // This guarantees to send all data (bytes) or return an error. + ssize_t n = this->send (stub, + twoway, + stream.begin (), + max_wait_time); - if (message_state == 0) + if (n == -1) { - if (TAO_debug_level > 0) + if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) SHMIOP_Transport::handle_client_input -") - ACE_TEXT (" nil message state\n"))); - this->tms_->connection_closed (); - return -1; - } + ACE_TEXT ("TAO: (%P|%t|%N|%l) closing conn %d after fault %p\n"), + this->handle (), + ACE_TEXT ("send_message ()\n"))); - int result = this->client_mesg_factory_->handle_input (this, - this->orb_core_, - *message_state, - max_wait_time); - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Transport::handle_client_input, handle_input"))); - this->tms_->connection_closed (); return -1; } - if (result == 0) - return result; - - // OK, the complete message is here... - result = this->client_mesg_factory_->parse_reply (*message_state, - this->params_); - - if (result == -1) + // EOF. + if (n == 0) { - if (TAO_debug_level > 0) + if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Transport::handle_client_input, parse reply"))); - message_state->reset (); - this->tms_->connection_closed (); - return -1; - } - - result = - this->tms_->dispatch_reply (this->params_.request_id_, - this->params_.reply_status_, - message_state->giop_version, - this->params_.svc_ctx_, - message_state); - - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : SHMIOP_Client_Transport::") - ACE_TEXT ("handle_client_input - ") - ACE_TEXT ("dispatch reply failed\n"))); - message_state->reset (); - this->tms_->connection_closed (); + ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n") + ACE_TEXT ("EOF, closing conn %d\n"), + this->handle())); return -1; } - if (result == 0) - { - message_state->reset (); - return 0; - } - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - this->tms_->destroy_message_state (message_state); - - return result; + return 1; } -int -TAO_SHMIOP_Client_Transport::register_handler (void) -{ - // @@ It seems like this method should go away, the right reactor is - // picked at object creation time. - ACE_Reactor *r = this->orb_core ()->reactor (); - if (r == this->service_handler ()->reactor ()) - return 0; - // About to be registered with the reactor, so bump the ref - // count - this->handler_->incr_ref_count (); +void +TAO_SHMIOP_Transport::start_request (TAO_ORB_Core * /*orb_core*/, + TAO_Target_Specification & /*spec */, + TAO_OutputCDR & /*output */, + CORBA::Environment & /*ACE_TRY_ENV*/) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // TAO_FUNCTION_PP_TIMEPROBE (TAO_SHMIOP_CLIENT_TRANSPORT_START_REQUEST_START); - // Set the flag in the Connection Handler - this->handler_->is_registered (1); + // @@ This method should NO longer be required.. - // Register the handler with the reactor - return r->register_handler (this->handler_, - ACE_Event_Handler::READ_MASK); + /* if (this->client_mesg_factory_->write_protocol_header + (TAO_PLUGGABLE_MESSAGE_REQUEST, + output) == 0) + ACE_THROW (CORBA::MARSHAL ());*/ } -int -TAO_SHMIOP_Client_Transport::messaging_init (CORBA::Octet major, - CORBA::Octet minor) +void +TAO_SHMIOP_Transport::start_locate (TAO_ORB_Core * /*orb_core*/, + TAO_Target_Specification &spec, + TAO_Operation_Details &opdetails, + TAO_OutputCDR &output, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) { - if (this->client_mesg_factory_ == 0) - { - if (this->lite_flag_) - { - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Lite (this->orb_core_), - -1); - } - else if (major == TAO_DEF_GIOP_MAJOR) - { - if (minor > TAO_DEF_GIOP_MINOR) - minor = TAO_DEF_GIOP_MINOR; - switch (minor) - { - case 0: - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Connector_10, - 0); - break; - case 1: - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Connector_11, - 0); - break; - case 2: - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Connector_12, - 0); - break; - default: - if (TAO_debug_level > 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%N|%l|%p|%t) No matching minor version number \n")), - 0); - } - } - } - else - { - if (TAO_debug_level > 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%N|%l|%p|%t) No matching major version number \n")), - 0); - } - } - } - - return 1; + if (this->messaging_object_->generate_locate_request_header (opdetails, + spec, + output) == -1) + ACE_THROW (CORBA::MARSHAL ()); } + CORBA::Boolean -TAO_SHMIOP_Client_Transport::send_request_header (TAO_Operation_Details &opdetails, - TAO_Target_Specification &spec, - TAO_OutputCDR & msg) +TAO_SHMIOP_Transport::send_request_header (TAO_Operation_Details &opdetails, + TAO_Target_Specification &spec, + TAO_OutputCDR &msg) { // We are going to pass on this request to the underlying messaging // layer. It should take care of this request - CORBA::Boolean retval = - this->client_mesg_factory_->write_message_header (opdetails, - TAO_PLUGGABLE_MESSAGE_REQUEST_HEADER, + if (this->messaging_object_->generate_request_header (opdetails, spec, - msg); + msg) == -1) + return 0; - return retval; + return 1; } - -TAO_SHMIOP_SVC_HANDLER * -TAO_SHMIOP_Client_Transport::service_handler (void) +int +TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major, + CORBA::Octet minor) { - return this->handler_; + this->messaging_object_->init (major, + minor); + return 1; } -void -TAO_SHMIOP_Client_Transport::close_connection (void) +int +TAO_SHMIOP_Transport::process_message (void) { - this->handler_->purge_entry (); - this->service_handler ()->handle_close (); -} + // Get the <message_type> that we have received + TAO_Pluggable_Message_Type t = + this->messaging_object_->message_type (); -// ********************************************************************* -ssize_t -TAO_SHMIOP_Transport::send (TAO_Stub *stub, - int two_way, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) -{ - if (stub == 0 || two_way) + int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { - return this->send (message_block, - max_wait_time); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("Close Connection Message recd \n"))); + + this->tms_->connection_closed (); } - else + else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) { - TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); - - return sync_strategy.send (*this, - *stub, - message_block, - max_wait_time); + if (this->messaging_object_->process_request_message (this, + this->orb_core ()) == -1) + return -1; } -} + else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) + { + TAO_Pluggable_Reply_Params params (this->orb_core ()); + if (this->messaging_object_->process_reply_message (params) == -1) + { -ssize_t -TAO_SHMIOP_Transport::send (const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time, - size_t *) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_SHMIOP_TRANSPORT_SEND_START); - return this->service_handler ()->peer ().send (message_block, - max_wait_time); -} + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("SHMIOP_Transport::process_message, process_reply_message ()"))); -ssize_t -TAO_SHMIOP_Transport::send (const u_char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_SHMIOP_TRANSPORT_SEND_START); + this->messaging_object_->reset (); + this->tms_->connection_closed (); + return -1; + } - return this->service_handler ()->peer ().send (buf, - len, - max_wait_time); -} -ssize_t -TAO_SHMIOP_Transport::recv (char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_SHMIOP_TRANSPORT_RECEIVE_START); + result = + this->tms_->dispatch_reply (params); + + // @@ Somehow it seems dangerous to reset the state *after* + // dispatching the request, what if another threads receives + // another reply in the same connection? + // My guess is that it works as follows: + // - For the exclusive case there can be no such thread. + // - The the muxed case each thread has its own message_state. + // I'm pretty sure this comment is right. Could somebody else + // please look at it and confirm my guess? + + // @@ The above comment was found in the older versions of the + // code. The code was also written in such a way that, when + // the client thread on a call from handle_input () from the + // reactor a call would be made on the handle_client_input + // (). The implementation of handle_client_input () looked so + // flaky. It used to create a message state upon entry in to + // the function using the TMS and destroy that on exit. All + // this was fine _theoretically_ for multiple threads. But + // the flakiness was originating in the implementation of + // get_message_state () where we were creating message state + // only once and dishing it out for every thread till one of + // them destroy's it. So, it looked broken. That has been + // changed. Why?. To my knowledge, the reactor does not call + // handle_input () on two threads at the same time. So, IMHO + // that defeats the purpose of creating a message state for + // every thread. This is just my guess. If we run in to + // problems this place needs to be revisited. If someone else + // is going to take a look please contact bala@cs.wustl.edu + // for details on this-- Bala + + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) : SHMIOP_Client_Transport::") + ACE_TEXT ("handle_client_input - ") + ACE_TEXT ("dispatch reply failed\n"))); + this->messaging_object_->reset (); + this->tms_->connection_closed (); + return -1; + } - return this->service_handler ()->peer ().recv (buf, - len, - max_wait_time); -} + if (result == 0) + { + this->messaging_object_->reset (); + return 0; + } -// Default action to be taken for send request. -int -TAO_SHMIOP_Transport::send_request (TAO_Stub *, - TAO_ORB_Core * /* orb_core */, - TAO_OutputCDR & /* stream */, - int /* twoway */, - ACE_Time_Value * /* max_wait_time */) -{ - return -1; -} -CORBA::Boolean -TAO_SHMIOP_Transport::send_request_header (TAO_Operation_Details & /**/, - TAO_Target_Specification & /*spec */ , - TAO_OutputCDR & /*msg*/) -{ - // We should never be here. So return an error. - return 0; + // This is a NOOP for the Exclusive request case, but it actually + // destroys the stream in the muxed case. + //this->tms_->destroy_message_state (message_state); + } + else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + { + return -1; + } + + this->messaging_object_->reset (); + return 1; } diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h index 04639559d70..4914b561b03 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.h +++ b/TAO/tao/Strategies/SHMIOP_Transport.h @@ -1,22 +1,15 @@ // This may look like C, but it's really -*- C++ -*- // $Id$ - -// ============================================================================ -// -// = LIBRARY -// TAO -// -// = FILENAME -// SHMIOP_Transport.h -// -// = DESCRIPTION -// IIOP Transport specific processing -// -// = AUTHOR -// Nanbor Wang <nanbor@cs.wustl.edu> -// -// ============================================================================ - +// =================================================================== +/** + * @file SHMIOP_Transport.h + * + * $Id$ + * + * @author Originally by Nanbor Wang <nanbor@cs.wustl.edu> + * @author Modified by Balachandran Natarajan <bala@cs.wustl.edu> + */ +// =================================================================== #ifndef TAO_SHMIOP_TRANSPORT_H #define TAO_SHMIOP_TRANSPORT_H #include "ace/pre.h" @@ -30,96 +23,96 @@ #if defined (TAO_HAS_SHMIOP) && (TAO_HAS_SHMIOP != 0) #include "strategies_export.h" - -#include "tao/Pluggable.h" -#include "tao/GIOP_Message_State.h" -#include "ace/MEM_Acceptor.h" -#include "ace/Synch.h" #include "ace/Svc_Handler.h" +#include "ace/Mem_Stream.h" +#include "tao/Pluggable.h" -// Forward decls. -class TAO_SHMIOP_Client_Connection_Handler; -class TAO_SHMIOP_Server_Connection_Handler; -class TAO_ORB_Core; +class TAO_SHMIOP_Connection_Handler; +class TAO_Pluggable_Messaging; +class TAO_Target_Specification; +class Tao_Operation_Details; typedef ACE_Svc_Handler<ACE_MEM_STREAM, ACE_NULL_SYNCH> - TAO_SHMIOP_SVC_HANDLER; + TAO_SHMIOP_SVC_HANDLER; + +/** + * @class TAO_SHMIOP_Transport + * + * @brief Specialization of the base TAO_Transport class to handle the + * SHMIOP protocol. + * + */ class TAO_Strategies_Export TAO_SHMIOP_Transport : public TAO_Transport { - // = TITLE - // This class acts as a bridge class to the transport specific - // connection handler (handler_). - // - // = DESCRIPTION - // Specialization of the base TAO_Transport class to handle the IIOP - // protocol. This class in turn will be further specialized for - // the client and server side. public: - TAO_SHMIOP_Transport (TAO_ORB_Core *orb_core); - // Base object's creator method. - ~TAO_SHMIOP_Transport (void); - // Default destructor. + /// Constructor. + TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler, + TAO_ORB_Core *orb_core, + CORBA::Boolean flag); - // = The TAO_Transport methods, please check the documentation in - // "tao/Pluggable.h" for more details. + /// Default destructor. + ~TAO_SHMIOP_Transport (void); + /// Return the connection service handler + TAO_SHMIOP_SVC_HANDLER *service_handler (void); + /// The TAO_Transport methods, please check the documentation in + /// "tao/Pluggable.h" for more details. virtual ACE_HANDLE handle (void); + virtual ACE_Event_Handler *event_handler (void); + + virtual void close_connection (void); + + virtual int idle (void); + + /// Write the complete Message_Block chain to the connection. virtual ssize_t send (TAO_Stub *stub, int two_way, const ACE_Message_Block *mblk, const ACE_Time_Value *s = 0); + virtual ssize_t send (const ACE_Message_Block *mblk, const ACE_Time_Value *s = 0, size_t *bytes_transferred = 0); + + + /// Write the contents of the buffer of length len to the + /// connection. virtual ssize_t send (const u_char *buf, size_t len, const ACE_Time_Value *s = 0); + + /// Read len bytes from into buf. virtual ssize_t recv (char *buf, size_t len, const ACE_Time_Value *s = 0); + + /// Read and process the message from the connection. The processing + /// of the message is done by delegating the work to the underlying + /// messaging object + virtual int read_process_message (ACE_Time_Value *max_time_value = 0, + int block =0); + + virtual int register_handler (void); + + /// @@TODO: These methods IMHO should have more meaningful + /// names. The names seem to indicate nothing. virtual int send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core , + TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int twoway, ACE_Time_Value *max_wait_time); - virtual CORBA::Boolean - send_request_header (TAO_Operation_Details &opdetails, - TAO_Target_Specification &spec, - TAO_OutputCDR &msg); - - virtual TAO_SHMIOP_SVC_HANDLER *service_handler (void) = 0; - // Return the underlying the service handler -}; -class TAO_Strategies_Export TAO_SHMIOP_Client_Transport : public TAO_SHMIOP_Transport -{ - // = TITLE - // The Transport class used for Client side communication with a - // server. - // - // = DESCRIPTION - // Specialization of the TAO_SHMIOP_Transport class for client - // side. Methods related to sending one and two way requests - // lives here. -public: - TAO_SHMIOP_Client_Transport (TAO_SHMIOP_Client_Connection_Handler *handler, - TAO_ORB_Core *orb_core); - // Constructor. - - ~TAO_SHMIOP_Client_Transport (void); - // destructor - - // = The TAO_Transport methods, please check the documentation in - // "tao/Pluggable.h" for more details. - virtual void close_connection (void); - virtual int idle (void); + virtual int send_message (TAO_OutputCDR &stream, + TAO_Stub *stub = 0, + int twoway = 1, + ACE_Time_Value *max_time_wait = 0); virtual void start_request (TAO_ORB_Core *orb_core, - TAO_Target_Specification & /*spec */, + TAO_Target_Specification &spec, TAO_OutputCDR &output, CORBA::Environment &ACE_TRY_ENV = TAO_default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException)); @@ -130,87 +123,33 @@ public: TAO_OutputCDR &output, CORBA::Environment &ACE_TRY_ENV = TAO_default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException)); - virtual int send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core, - TAO_OutputCDR &stream, - int twoway, - ACE_Time_Value *max_wait_time); - virtual int handle_client_input (int block = 0, - ACE_Time_Value *max_time_value = 0); - virtual int register_handler (void); virtual CORBA::Boolean send_request_header (TAO_Operation_Details &opdetails, TAO_Target_Specification &spec, TAO_OutputCDR &msg); - virtual TAO_SHMIOP_SVC_HANDLER *service_handler (void); - - int messaging_init (CORBA::Octet major, - CORBA::Octet minor); - // Initialising the messaging object - - void use_lite (CORBA::Boolean flag); - // Sets the lite flag + /// Initialising the messaging object + virtual int messaging_init (CORBA::Octet major, + CORBA::Octet minor); private: - TAO_SHMIOP_Client_Connection_Handler *handler_; - // the connection service handler used for accessing lower layer - // communication protocols. - - TAO_Pluggable_Messaging *client_mesg_factory_; - // The message_factor instance specific for this particular - // transport protocol. + /// Process the message that we have read + int process_message (void); - TAO_ORB_Core *orb_core_; - // Our ORB core +private: - CORBA::Boolean lite_flag_; - // Are we using lite? + /// The connection service handler used for accessing lower layer + /// communication protocols. + TAO_SHMIOP_Connection_Handler *connection_handler_; - TAO_Pluggable_Reply_Params params_; - // The reply data that is sent back by the server + /// Our messaging object. + TAO_Pluggable_Messaging *messaging_object_; }; -// **************************************************************** - -class TAO_Strategies_Export TAO_SHMIOP_Server_Transport : public TAO_SHMIOP_Transport -{ - // = TITLE - // The Transport class used for server communication with a - // connected client. - // - // = DESCRIPTION - // Specialization of the TAO_SHMIOP_Transport class for the server side. - // methods for reading messages (requests) and sending replies live - // here. -public: - - TAO_SHMIOP_Server_Transport (TAO_SHMIOP_Server_Connection_Handler *handler, - TAO_ORB_Core *orb_core); - // Default creator method. - - ~TAO_SHMIOP_Server_Transport (void); - // Default destructor - - TAO_GIOP_Message_State message_state_; - // This keep the state of the current message, to enable - // non-blocking reads, fragment reassembly, etc. - // Please see Pluggable.h for documentation - virtual void close_connection (void); - virtual int idle (void); - - virtual TAO_SHMIOP_SVC_HANDLER *service_handler (void); - -private: - - TAO_SHMIOP_Server_Connection_Handler *handler_; - // the connection service handler used for accessing lower layer - // communication protocols. -}; #if defined (__ACE_INLINE__) #include "SHMIOP_Transport.i" diff --git a/TAO/tao/Strategies/SHMIOP_Transport.i b/TAO/tao/Strategies/SHMIOP_Transport.i index 7d806dec8ce..81bf354f364 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.i +++ b/TAO/tao/Strategies/SHMIOP_Transport.i @@ -1,8 +1,2 @@ // -*- C++ -*- //$Id$ - -ACE_INLINE void -TAO_SHMIOP_Client_Transport::use_lite (CORBA::Boolean flag) -{ - this->lite_flag_ = flag; -} diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp new file mode 100644 index 00000000000..5844199f791 --- /dev/null +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp @@ -0,0 +1,314 @@ +// $Id$ + +#include "UIOP_Connect.h" + +#if TAO_HAS_UIOP == 1 + +#include "UIOP_Transport.h" +#include "tao/debug.h" +#include "tao/ORB_Core.h" +#include "tao/ORB.h" +#include "tao/CDR.h" +#include "tao/Timeprobe.h" +#include "tao/Server_Strategy_Factory.h" +#include "tao/Messaging_Policy_i.h" +#include "UIOP_Endpoint.h" +#include "tao/Base_Connection_Property.h" + +#if !defined (__ACE_INLINE__) +# include "UIOP_Connection_Handler.inl" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(Strategies, UIOP_Connect, "$Id$") + +#include "tao/GIOP_Message_Acceptors.h" +#include "tao/GIOP_Message_Lite.h" + + + +TAO_UIOP_Connection_Handler::TAO_UIOP_Connection_Handler (ACE_Thread_Manager *t) + : TAO_UIOP_SVC_HANDLER (t, 0 , 0), + TAO_Connection_Handler (0), + transport_ (this, 0, 0), + refcount_ (1), + tcp_properties_ (0) +{ + // This constructor should *never* get called, it is just here to + // make the compiler happy: the default implementation of the + // Creation_Strategy requires a constructor with that signature, we + // don't use that implementation, but some (most?) compilers + // instantiate it anyway. + ACE_ASSERT (this->orb_core () != 0); +} + + +TAO_UIOP_Connection_Handler::TAO_UIOP_Connection_Handler (TAO_ORB_Core *orb_core, + CORBA::Boolean flag, + void *arg) + : TAO_UIOP_SVC_HANDLER (orb_core->thr_mgr (), 0, 0), + TAO_Connection_Handler (orb_core), + transport_ (this, orb_core, flag), + refcount_ (1), + tcp_properties_ (ACE_static_cast + (TAO_UIOP_Properties *, arg)) +{ +} + + +TAO_UIOP_Connection_Handler::~TAO_UIOP_Connection_Handler (void) +{ + + // If the socket has not already been closed. + if (this->get_handle () != ACE_INVALID_HANDLE) + { + // Cannot deal with errors, and therefore they are ignored. + this->transport_.send_buffered_messages (); + } + else + { + // Dequeue messages and delete message blocks. + this->transport_.dequeue_all (); + } +} + + +// @@ Should I do something here to enable non-blocking?? (Alex). +// @@ Alex: I don't know if this is the place to do it, but the way to +// do it is: +// if (this->peer ().enable (ACE_NONBLOCK) == -1) +// return -1; +// Probably we will need to use the transport to decide if it is +// needed or not. + +int +TAO_UIOP_Connection_Handler::open (void*) +{ + if (this->set_socket_option (this->peer (), + this->tcp_properties_->send_buffer_size, + this->tcp_properties_->recv_buffer_size) == -1) + return -1; + +#if !defined (ACE_LACKS_TCP_NODELAY) + + if (this->peer ().set_option (ACE_IPPROTO_TCP, + TCP_NODELAY, + (void *) &tcp_properties_->no_delay, + sizeof (int)) == -1) + return -1; +#endif /* ! ACE_LACKS_TCP_NODELAY */ + + // Called by the <Strategy_Acceptor> when the handler is + // completely connected. + ACE_INET_Addr addr; + + char client[MAXHOSTNAMELEN + 16]; + + // Get the peername. + if (this->peer ().get_remote_addr (addr) == -1) + return -1; + + // Verify that we can resolve the peer hostname. + else if (addr.addr_to_string (client, sizeof (client)) == -1) + return -1; + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) UIOP connection from client") + ACE_TEXT ("<%s> on %d\n"), + client, this->peer ().get_handle ())); + } + + return 0; +} + +int +TAO_UIOP_Connection_Handler::activate (long flags, + int n_threads, + int force_active, + long priority, + int grp_id, + ACE_Task_Base *task, + ACE_hthread_t thread_handles[], + void *stack[], + size_t stack_size[], + ACE_thread_t thread_names[]) +{ + if (TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) UIOP_Connection_Handler::activate %d ") + ACE_TEXT ("threads, flags = %d\n"), + n_threads, + flags, + THR_BOUND)); + + return TAO_UIOP_SVC_HANDLER::activate (flags, + n_threads, + force_active, + priority, + grp_id, + task, + thread_handles, + stack, + stack_size, + thread_names); +} + +int +TAO_UIOP_Connection_Handler::svc (void) +{ + // This method is called when an instance is "activated", i.e., + // turned into an active object. Presumably, activation spawns a + // thread with this method as the "worker function". + + // Call the implementation here + return this->svc_i (); +} + + +int +TAO_UIOP_Connection_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask rm) +{ + // @@ Alex: we need to figure out if the transport decides to close + // us or something else. If it is something else (for example + // the cached connector trying to make room for other + // connections) then we should let the transport know, so it can + // in turn take appropiate action (such as sending exceptions to + // all waiting reply handlers). + if (TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) ") + ACE_TEXT ("UIOP_Connection_Handler::handle_close ") + ACE_TEXT ("(%d, %d)\n"), + handle, + rm)); + + --this->refcount_; + if (this->refcount_ == 0 && + this->is_registered ()) + { + // Make sure there are no timers. + this->reactor ()->cancel_timer (this); + + // Set the flag to indicate that it is no longer registered with + // the reactor, so that it isn't included in the set that is + // passed to the reactor on ORB destruction. + this->is_registered (0); + + // Decrement the reference count + this->decr_ref_count (); + } + + return 0; +} + +ACE_HANDLE +TAO_UIOP_Connection_Handler::fetch_handle (void) +{ + return this->get_handle (); +} + + +int +TAO_UIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // This method is called when buffering timer expires. + // + ACE_Time_Value *max_wait_time = 0; + + TAO_Stub *stub = 0; + int has_timeout; + this->orb_core ()->call_timeout_hook (stub, + has_timeout, + *max_wait_time); + + // Cannot deal with errors, and therefore they are ignored. + this->transport ()->send_buffered_messages (max_wait_time); + + return 0; +} + + +int +TAO_UIOP_Connection_Handler::close (u_long) +{ + this->destroy (); + + return 0; +} + + +int +TAO_UIOP_Connection_Handler::add_handler_to_cache (void) +{ + ACE_INET_Addr addr; + + // Get the peername. + if (this->peer ().get_remote_addr (addr) == -1) + return -1; + + // Construct an UIOP_Endpoint object + TAO_UIOP_Endpoint endpoint (addr); + + // Construct a property object + TAO_Base_Connection_Property prop (&endpoint); + + // Add the handler to Cache + return this->orb_core ()->connection_cache ().cache_handler (&prop, + this); +} + + +int +TAO_UIOP_Connection_Handler::handle_input (ACE_HANDLE h) +{ + return this->handle_input_i (h); +} + + +int +TAO_UIOP_Connection_Handler::handle_input_i (ACE_HANDLE, + ACE_Time_Value *max_wait_time) +{ + this->refcount_++; + + // Call the transport read the message + int result = this->transport_.read_process_message (max_wait_time); + + // Now the message has been read + if (result == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("UIOP_Connection_Handler::read_message \n"))); + + } + + // The upcall is done. Bump down the reference count + --this->refcount_; + if (this->refcount_ == 0) + this->decr_ref_count (); + + if (result == 0 || result == -1) + { + return result; + } + + return 0; +} + + + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH>; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#endif /*TAO_HAS_UIOP == 1*/ diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.h b/TAO/tao/Strategies/UIOP_Connection_Handler.h new file mode 100644 index 00000000000..eb39aacc0ee --- /dev/null +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.h @@ -0,0 +1,160 @@ +// This may look like C, but it's really -*- C++ -*- + +// =================================================================== +/** + * @file UIOP_Connection_Handler.h + * + * $Id$ + * + * @author Originally by Ossama Othman <ossama@ece.uci.edu> as + * UIOP_Connect.h + * @author modified by Balachandran Natarajan <bala@cs.wustl.edu> + */ +// =================================================================== +#ifndef TAO_UIOP_CONNECTION_HANDLER_H +#define TAO_UIOP_CONNECTION_HANDLER_H +#include "ace/pre.h" + +#include "UIOP_Transport.h" + +#if TAO_HAS_UIOP == 1 + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Acceptor.h" +#include "ace/Reactor.h" +#include "tao/Connection_Handler.h" +#include "tao/corbafwd.h" +#include "tao/Wait_Strategy.h" + + +// Forward Decls +class TAO_Pluggable_Messaging; + +/** + * @class TAO_IIOP_Properties + * + * @brief Unix Domain Sockets protocol properties specification for a + * set of connections. + */ + +class TAO_Strategies_Export TAO_UIOP_Properties +{ +public: + + int send_buffer_size; + int recv_buffer_size; +}; + +// **************************************************************** + +/** + * @class TAO_IIOP_Connection_Handler + * + * @brief Handles requests on a single connection. + * + * The Connection handler which is common for the Acceptor and + * the Connector + */ + + +class TAO_Strategies_Export TAO_UIOP_Connection_Handler : public TAO_UIOP_SVC_HANDLER, + public TAO_Connection_Handler +{ + +public: + + TAO_UIOP_Connection_Handler (ACE_Thread_Manager* t = 0); + + /// Constructor. <arg> parameter is used by the Acceptor to pass the + /// protocol configuration properties for this connection. + TAO_UIOP_Connection_Handler (TAO_ORB_Core *orb_core, + CORBA::Boolean flag, + void *arg); + + + /// Destructor. + ~TAO_UIOP_Connection_Handler (void); + + /// Called by the <Strategy_Acceptor> when the handler is completely + /// connected. Argument is unused. + virtual int open (void *); + + + /// = Active object activation method. + virtual int activate (long flags = THR_NEW_LWP, + int n_threads = 1, + int force_active = 0, + long priority = ACE_DEFAULT_THREAD_PRIORITY, + int grp_id = -1, + ACE_Task_Base *task = 0, + ACE_hthread_t thread_handles[] = 0, + void *stack[] = 0, + size_t stack_size[] = 0, + ACE_thread_t thread_names[] = 0); + + /// Only used when the handler is turned into an active object by + /// calling <activate>. This serves as the event loop in such cases. + virtual int svc (void); + + /// Perform appropriate closing. + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::NULL_MASK); + + /// Return the underlying transport object + TAO_Transport *transport (void); + + /// Return the underlying handle + virtual ACE_HANDLE fetch_handle (void); + + /// Called when buffering timer expires. + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg = 0); + + /// Object termination hook. + virtual int close (u_long flags = 0); + + /// Add ourselves to Cache. + int add_handler_to_cache (void); + +protected: + + /// = Event Handler overloads + + /// Reads a message from the <peer()>, dispatching and servicing it + /// appropriately. + /// handle_input() just delegates on handle_input_i() which timeouts + /// after <max_wait_time>, this is used in thread-per-connection to + /// ensure that server threads eventually exit. + + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); + virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Time_Value *max_wait_time = 0); + +private: + + /// Transport object reference. + TAO_UIOP_Transport transport_; + + /// Reference count.It is used to count nested upcalls on this + /// svc_handler i.e., the connection can close during nested upcalls, + /// you should not delete the svc_handler until the stack unwinds + /// from the nested upcalls. + u_long refcount_; + + /// TCP configuration for this connection. + TAO_UIOP_Properties *tcp_properties_; +}; + + + +#if defined (__ACE_INLINE__) +#include "UIOP_Connection_Handler.inl" +#endif /* __ACE_INLINE__ */ + +#endif /* TAO_HAS_UIOP == 1 */ + +#include "ace/post.h" +#endif /* TAO_UIOP_CONNECT_H */ diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.inl b/TAO/tao/Strategies/UIOP_Connection_Handler.inl new file mode 100644 index 00000000000..c79da486e4d --- /dev/null +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.inl @@ -0,0 +1,19 @@ +// -*- C++ -*- +// $Id$ + +#if TAO_HAS_UIOP == 1 + + +ACE_INLINE TAO_Transport * +TAO_UIOP_Server_Connection_Handler::transport (void) +{ + return &(this->transport_); +} + +ACE_INLINE TAO_Transport * +TAO_UIOP_Client_Connection_Handler::transport (void) +{ + return &(this->transport_); +} + +#endif /* TAO_HAS_UIOP == 1 */ diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index fc22a84665f..885535e01d9 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -18,463 +18,413 @@ ACE_RCSID (Strategies, UIOP_Transport, "$Id$") #include "tao/Stub.h" #include "tao/ORB_Core.h" #include "tao/debug.h" -#include "tao/GIOP_Message_Connectors.h" +#include "tao/GIOP_Message_Base.h" #include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "UIOP_Transport.i" #endif /* ! __ACE_INLINE__ */ -#if defined (ACE_ENABLE_TIMEPROBES) -static const char *TAO_UIOP_Transport_Timeprobe_Description[] = - { - "UIOP_Transport::send - start", - "UIOP_Transport::send - end", - "UIOP_Transport::receive - start", - "UIOP_Transport::receive - end", - - "UIOP_Client_Transport::start_request - start", - "UIOP_Client_Transport::start_request - end" - }; - -enum - { - TAO_UIOP_TRANSPORT_SEND_START = 1300, - TAO_UIOP_TRANSPORT_SEND_END, - - TAO_UIOP_TRANSPORT_RECEIVE_START, - TAO_UIOP_TRANSPORT_RECEIVE_END, - - TAO_UIOP_CLIENT_TRANSPORT_START_REQUEST_START, - TAO_UIOP_CLIENT_TRANSPORT_START_REQUEST_END - }; - - -// Setup Timeprobes -ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_UIOP_Transport_Timeprobe_Description, - TAO_UIOP_TRANSPORT_SEND_START); - -#endif /* ACE_ENABLE_TIMEPROBES */ - -TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_ORB_Core *orb_core) +TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler, + TAO_ORB_Core *orb_core, + CORBA::Boolean flag) : TAO_Transport (TAO_TAG_UIOP_PROFILE, - orb_core) + orb_core), + connection_handler_ (handler), + messaging_object_ (0) { + if (flag) + { + // Use the lite version of the protocol + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Lite (orb_core)); + } + else + { + // Use the normal GIOP object + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Base (orb_core)); + } } TAO_UIOP_Transport::~TAO_UIOP_Transport (void) { + delete this->messaging_object_; } - +TAO_UIOP_SVC_HANDLER * +TAO_UIOP_Transport::service_handler (void) +{ + return this->connection_handler_; +} ACE_HANDLE TAO_UIOP_Transport::handle (void) { - return this->service_handler ()->get_handle (); + return this->connection_handler_->get_handle (); } ACE_Event_Handler * TAO_UIOP_Transport::event_handler (void) { - return this->service_handler (); + return this->connection_handler_; } -// **************************************************************** - -TAO_UIOP_Server_Transport:: - TAO_UIOP_Server_Transport (TAO_UIOP_Server_Connection_Handler *handler, - TAO_ORB_Core* orb_core) - : TAO_UIOP_Transport (orb_core), - message_state_ (orb_core), - handler_ (handler) +void +TAO_UIOP_Transport::close_connection (void) { -} + // Now close the handler + this->connection_handler_->handle_close (); -TAO_UIOP_Server_Transport::~TAO_UIOP_Server_Transport (void) -{ + // Purge the entry from the Cache map first and then close the + // handler + this->connection_handler_->purge_entry (); } int -TAO_UIOP_Server_Transport::idle (void) +TAO_UIOP_Transport::idle (void) { - return this->handler_->make_idle (); + return this->connection_handler_->make_idle (); } -TAO_UIOP_SVC_HANDLER * -TAO_UIOP_Server_Transport::service_handler (void) + +ssize_t +TAO_UIOP_Transport::send (TAO_Stub *stub, + int two_way, + const ACE_Message_Block *message_block, + const ACE_Time_Value *max_wait_time) { - return this->handler_; + if (stub == 0 || two_way) + { + return this->send (message_block, + max_wait_time); + } + else + { + TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); + + return sync_strategy.send (*this, + *stub, + message_block, + max_wait_time); + } } -void -TAO_UIOP_Server_Transport::close_connection (void) + +ssize_t +TAO_UIOP_Transport::send (const ACE_Message_Block *message_block, + const ACE_Time_Value *max_wait_time, + size_t *bytes_transferred) { - // Purge the handler entry from the Connection Cache - this->handler_->purge_entry (); - this->handler_->handle_close (); + return ACE::send_n (this->handle (), + message_block, + max_wait_time, + bytes_transferred); } -// **************************************************************** - -TAO_UIOP_Client_Transport:: - TAO_UIOP_Client_Transport (TAO_UIOP_Client_Connection_Handler *handler, - TAO_ORB_Core *orb_core) - : TAO_UIOP_Transport (orb_core), - handler_ (handler), - client_mesg_factory_ (0), - orb_core_ (orb_core), - lite_flag_ (0), - params_ () +ssize_t +TAO_UIOP_Transport::send (const u_char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) { + return this->service_handler ()->peer ().send_n (buf, + len, + max_wait_time); } -TAO_UIOP_Client_Transport::~TAO_UIOP_Client_Transport (void) +ssize_t +TAO_UIOP_Transport::recv (char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) { - delete this->client_mesg_factory_; + return this->service_handler ()->peer ().recv_n (buf, + len, + max_wait_time); } + int -TAO_UIOP_Client_Transport::idle (void) +TAO_UIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, + int block) { - return this->handler_->make_idle (); -} + // Read the message of the socket + int result = this->messaging_object_->read_message (this, + block, + max_wait_time); -void -TAO_UIOP_Client_Transport::start_request (TAO_ORB_Core * /*orb_core*/, - TAO_Target_Specification & /*spec*/, - TAO_OutputCDR &output, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException)) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_CLIENT_TRANSPORT_START_REQUEST_START); + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("UIOP_Transport::read_message, failure in read_message ()"))); + + this->tms_->connection_closed (); + return -1; + } + if (result == 0) + return result; + + // Now we know that we have been able to read the complete message + // here.. + return this->process_message (); - if (this->client_mesg_factory_->write_protocol_header - (TAO_PLUGGABLE_MESSAGE_REQUEST, - output) == 0) - ACE_THROW (CORBA::MARSHAL ()); } -void -TAO_UIOP_Client_Transport::start_locate (TAO_ORB_Core * /*orb_core*/, - TAO_Target_Specification &spec, - TAO_Operation_Details &opdetails, - TAO_OutputCDR &output, - CORBA::Environment &ACE_TRY_ENV) - ACE_THROW_SPEC ((CORBA::SystemException)) + +int +TAO_UIOP_Transport::register_handler (void) { - if (this->client_mesg_factory_->write_protocol_header - (TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, - output) == 0) - ACE_THROW (CORBA::MARSHAL ()); + // @@ It seems like this method should go away, the right reactor is + // picked at object creation time. + ACE_Reactor *r = this->orb_core_->reactor (); - if (this->client_mesg_factory_->write_message_header (opdetails, - TAO_PLUGGABLE_MESSAGE_LOCATE_REQUEST_HEADER, - spec, - output) == 0) - ACE_THROW (CORBA::MARSHAL ()); + if (r == this->connection_handler_->reactor ()) + return 0; + + // About to be registered with the reactor, so bump the ref + // count + this->connection_handler_->incr_ref_count (); + + // Set the flag in the Connection Handler + this->connection_handler_->is_registered (1); + + + // Register the handler with the reactor + return r->register_handler (this->connection_handler_, + ACE_Event_Handler::READ_MASK); } int -TAO_UIOP_Client_Transport::send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core, - TAO_OutputCDR &stream, - int two_way, - ACE_Time_Value *max_wait_time) +TAO_UIOP_Transport::send_request (TAO_Stub *stub, + TAO_ORB_Core *orb_core, + TAO_OutputCDR &stream, + int two_way, + ACE_Time_Value *max_wait_time) { if (this->ws_->sending_request (orb_core, two_way) == -1) return -1; - if (this->client_mesg_factory_->send_message (this, - stream, - max_wait_time, - stub, - two_way) == -1) + if (this->send_message (stream, + stub, + two_way, + max_wait_time) == -1) + return -1; return this->idle_after_send (); } -// Return 0, when the reply is not read fully, 1 if it is read fully. -// @@ This code should go in the TAO_Transport class is repeated for -// each transport!! int -TAO_UIOP_Client_Transport::handle_client_input (int /* block */, - ACE_Time_Value *max_wait_time) +TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream, + TAO_Stub *stub, + int twoway, + ACE_Time_Value *max_wait_time) { + // Format the message in the stream first + if (this->messaging_object_->format_message (stream) != 0) + return -1; - // Notice that the message_state is only modified in one thread at a - // time because the reactor does not call handle_input() for the - // same Event_Handler in two threads at the same time. + // Strictly speaking, should not need to loop here because the + // socket never gets set to a nonblocking mode ... some Linux + // versions seem to need it though. Leaving it costs little. - // Get the message state from the Transport Mux Strategy. - TAO_GIOP_Message_State* message_state = - this->tms_->get_message_state (); + // This guarantees to send all data (bytes) or return an error. + ssize_t n = this->send (stub, + twoway, + stream.begin (), + max_wait_time); - if (message_state == 0) + if (n == -1) { - if (TAO_debug_level > 0) + if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) UIOP_Transport::handle_client_input -" - " nil message state\n")); - this->tms_->connection_closed (); - return -1; - } + ACE_TEXT ("TAO: (%P|%t|%N|%l) closing conn %d after fault %p\n"), + this->handle (), + ACE_TEXT ("send_message ()\n"))); - int result = this->client_mesg_factory_->handle_input (this, - this->orb_core_, - *message_state, - max_wait_time); - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - %p\n", - "UIOP_Transport::handle_client_input, handle_input")); - this->tms_->connection_closed (); return -1; } - if (result == 0) - return result; - - // OK, the complete message is here... - result = this->client_mesg_factory_->parse_reply (*message_state, - this->params_); - if (result == -1) + // EOF. + if (n == 0) { - if (TAO_debug_level > 0) + if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - %p\n", - "UIOP_Transport::handle_client_input, parse reply")); - message_state->reset (); - this->tms_->connection_closed (); + ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n") + ACE_TEXT ("EOF, closing conn %d\n"), + this->handle())); return -1; } - result = - this->tms_->dispatch_reply (this->params_.request_id_, - this->params_.reply_status_, - message_state->giop_version, - this->params_.svc_ctx_, - message_state); - - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) : UIOP_Client_Transport::" - "handle_client_input - " - "dispatch reply failed\n")); - message_state->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - message_state->reset (); - return 0; - } - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - this->tms_->destroy_message_state (message_state); - - // Return something to indicate the reply is received. - return result; + return 1; } -int -TAO_UIOP_Client_Transport::register_handler (void) -{ - // @@ It seems like this method should go away, the right reactor is - // picked at object creation time. - ACE_Reactor *r = this->orb_core ()->reactor (); - if (r == this->service_handler ()->reactor ()) - return 0; - // About to be registered with the reactor, so bump the ref - // count - this->handler_->incr_ref_count (); +void +TAO_UIOP_Transport::start_request (TAO_ORB_Core * /*orb_core*/, + TAO_Target_Specification & /*spec */, + TAO_OutputCDR & /*output */, + CORBA::Environment & /*ACE_TRY_ENV*/) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_CLIENT_TRANSPORT_START_REQUEST_START); - // Set the flag in the Connection Handler - this->handler_->is_registered (1); + // @@ This method should NO longer be required.. - return r->register_handler (this->handler_, - ACE_Event_Handler::READ_MASK); + /* if (this->client_mesg_factory_->write_protocol_header + (TAO_PLUGGABLE_MESSAGE_REQUEST, + output) == 0) + ACE_THROW (CORBA::MARSHAL ());*/ } -TAO_UIOP_SVC_HANDLER * -TAO_UIOP_Client_Transport::service_handler (void) -{ - return this->handler_; -} -int -TAO_UIOP_Client_Transport:: - messaging_init (CORBA::Octet major, - CORBA::Octet minor) +void +TAO_UIOP_Transport::start_locate (TAO_ORB_Core * /*orb_core*/, + TAO_Target_Specification &spec, + TAO_Operation_Details &opdetails, + TAO_OutputCDR &output, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) { - if (this->client_mesg_factory_ == 0) - { - if (this->lite_flag_) - { - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Lite (this->orb_core_), - -1); - } - else if (major == TAO_DEF_GIOP_MAJOR) - { - if (minor > TAO_DEF_GIOP_MINOR) - minor = TAO_DEF_GIOP_MINOR; - switch (minor) - { - case 0: - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Connector_10, - 0); - break; - case 1: - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Connector_11, - 0); - break; - case 2: - ACE_NEW_RETURN (this->client_mesg_factory_, - TAO_GIOP_Message_Connector_12, - 0); - break; - default: - if (TAO_debug_level > 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%N|%l|%p|%t) No matching minor version number \n")), - 0); - } - } - } - else - { - if (TAO_debug_level > 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%N|%l|%p|%t) No matching major version number \n")), - 0); - } - } - } - - return 1; + if (this->messaging_object_->generate_locate_request_header (opdetails, + spec, + output) == -1) + ACE_THROW (CORBA::MARSHAL ()); } CORBA::Boolean -TAO_UIOP_Client_Transport::send_request_header (TAO_Operation_Details &opdetails, - TAO_Target_Specification &spec, - TAO_OutputCDR & msg) +TAO_UIOP_Transport::send_request_header (TAO_Operation_Details &opdetails, + TAO_Target_Specification &spec, + TAO_OutputCDR &msg) { // We are going to pass on this request to the underlying messaging // layer. It should take care of this request - CORBA::Boolean retval = - this->client_mesg_factory_->write_message_header (opdetails, - TAO_PLUGGABLE_MESSAGE_REQUEST_HEADER, - spec, - msg); - return retval; + if (this->messaging_object_->generate_request_header (opdetails, + spec, + msg) == -1) + return 0; + + return 1; } -void -TAO_UIOP_Client_Transport::close_connection (void) +int +TAO_UIOP_Transport::messaging_init (CORBA::Octet major, + CORBA::Octet minor) { - // Purge the handler entry from the Connection Cache - this->handler_->purge_entry (); - this->handler_->handle_close (); + this->messaging_object_->init (major, + minor); + return 1; } -// **************************************************************** - -ssize_t -TAO_UIOP_Transport::send (TAO_Stub *stub, - int two_way, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) +int +TAO_UIOP_Transport::process_message (void) { - if (stub == 0 || two_way) + // Get the <message_type> that we have received + TAO_Pluggable_Message_Type t = + this->messaging_object_->message_type (); + + + int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { - return this->send (message_block, - max_wait_time); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("Close Connection Message recd \n"))); + + this->tms_->connection_closed (); } - else + else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) { - TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); - - return sync_strategy.send (*this, - *stub, - message_block, - max_wait_time); + if (this->messaging_object_->process_request_message (this, + this->orb_core ()) == -1) + return -1; } -} + else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) + { + TAO_Pluggable_Reply_Params params (this->orb_core ()); + if (this->messaging_object_->process_reply_message (params) == -1) + { -ssize_t -TAO_UIOP_Transport::send (const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time, - size_t *bytes_transferred) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_TRANSPORT_SEND_START); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("UIOP_Transport::process_message, process_reply_message ()"))); - return ACE::send_n (this->handle (), - message_block, - max_wait_time, - bytes_transferred); -} + this->messaging_object_->reset (); + this->tms_->connection_closed (); + return -1; + } -ssize_t -TAO_UIOP_Transport::send (const u_char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_TRANSPORT_SEND_START); - return this->service_handler ()->peer ().send_n (buf, - len, - max_wait_time); -} + result = + this->tms_->dispatch_reply (params); + + // @@ Somehow it seems dangerous to reset the state *after* + // dispatching the request, what if another threads receives + // another reply in the same connection? + // My guess is that it works as follows: + // - For the exclusive case there can be no such thread. + // - The the muxed case each thread has its own message_state. + // I'm pretty sure this comment is right. Could somebody else + // please look at it and confirm my guess? + + // @@ The above comment was found in the older versions of the + // code. The code was also written in such a way that, when + // the client thread on a call from handle_input () from the + // reactor a call would be made on the handle_client_input + // (). The implementation of handle_client_input () looked so + // flaky. It used to create a message state upon entry in to + // the function using the TMS and destroy that on exit. All + // this was fine _theoretically_ for multiple threads. But + // the flakiness was originating in the implementation of + // get_message_state () where we were creating message state + // only once and dishing it out for every thread till one of + // them destroy's it. So, it looked broken. That has been + // changed. Why?. To my knowledge, the reactor does not call + // handle_input () on two threads at the same time. So, IMHO + // that defeats the purpose of creating a message state for + // every thread. This is just my guess. If we run in to + // problems this place needs to be revisited. If someone else + // is going to take a look please contact bala@cs.wustl.edu + // for details on this-- Bala + + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) : UIOP_Client_Transport::") + ACE_TEXT ("handle_client_input - ") + ACE_TEXT ("dispatch reply failed\n"))); + this->messaging_object_->reset (); + this->tms_->connection_closed (); + return -1; + } -ssize_t -TAO_UIOP_Transport::recv (char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) -{ - TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_TRANSPORT_RECEIVE_START); + if (result == 0) + { + this->messaging_object_->reset (); + return 0; + } - return this->service_handler ()->peer ().recv_n (buf, - len, - max_wait_time); -} -// Default action to be taken for send request. -int -TAO_UIOP_Transport::send_request (TAO_Stub *, - TAO_ORB_Core * /* orb_core */, - TAO_OutputCDR & /* stream */, - int /* twoway */, - ACE_Time_Value * /* max_wait_time */) -{ - return -1; -} + // This is a NOOP for the Exclusive request case, but it actually + // destroys the stream in the muxed case. + //this->tms_->destroy_message_state (message_state); + } + else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + { + return -1; + } -CORBA::Boolean -TAO_UIOP_Transport::send_request_header (TAO_Operation_Details & /*opdetails*/, - TAO_Target_Specification & /*spec*/, - TAO_OutputCDR & /*msg*/) -{ - // We should never be here. So return an error. - return 0; + this->messaging_object_->reset (); + return 1; } #endif /* TAO_HAS_UIOP */ diff --git a/TAO/tao/Strategies/UIOP_Transport.h b/TAO/tao/Strategies/UIOP_Transport.h index 485c08721ab..de4a2b3726d 100644 --- a/TAO/tao/Strategies/UIOP_Transport.h +++ b/TAO/tao/Strategies/UIOP_Transport.h @@ -1,23 +1,16 @@ // This may look like C, but it's really -*- C++ -*- -// $Id$ - - -// ============================================================================ -// -// = LIBRARY -// TAO -// -// = FILENAME -// UIOP_Transport.h -// -// = DESCRIPTION -// UIOP Transport specific processing // -// = AUTHOR -// Fred Kuhns <fredk@cs.wustl.edu> -// Ossama Othman <othman@cs.wustl.edu> -// -// ============================================================================ +// =================================================================== +/** + * @file UIOP_Transport.h + * + * $Id$ + * + * @author Originally by Fred Kuhns <fredk@cs.wustl.edu> and Ossama + * Othman <ossama@ece.uci.edu> + * @author Modified by Balachandran Natarajan <bala@cs.wustl.edu> + */ +// =================================================================== #ifndef TAO_UIOP_TRANSPORT_H #define TAO_UIOP_TRANSPORT_H @@ -39,90 +32,88 @@ // Forward decls. -class TAO_UIOP_Client_Connection_Handler; -class TAO_UIOP_Server_Connection_Handler; class TAO_ORB_Core; typedef ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH> TAO_UIOP_SVC_HANDLER; +/** + * @class TAO_UIOP_Transport + * + * @brief Specialization of the base TAO_Transport class to handle the + * UIOP protocol. + * + * + * + */ + class TAO_Strategies_Export TAO_UIOP_Transport : public TAO_Transport { - // = TITLE - // This class acts as a bridge class to the transport specific - // connection handler (handler_). - // - // = DESCRIPTION - // Specialization of the base TAO_Transport class to handle the UIOP - // protocol. This class in turn will be further specialized for - // the client and server side. public: - TAO_UIOP_Transport (TAO_ORB_Core *orb_core); - // Base object's creator method. - ~TAO_UIOP_Transport (void); - // Default destructor. + /// Constructor. + TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler, + TAO_ORB_Core *orb_core, + CORBA::Boolean flag); - // = The TAO_Transport methods, please check the documentation in - // "tao/Pluggable.h" for more details. + /// Default destructor. + ~TAO_UIOP_Transport (void); + /// Return the connection service handler + TAO_UIOP_SVC_HANDLER *service_handler (void); + /// The TAO_Transport methods, please check the documentation in + /// "tao/Pluggable.h" for more details. virtual ACE_HANDLE handle (void); + virtual ACE_Event_Handler *event_handler (void); + + virtual void close_connection (void); + + virtual int idle (void); + + /// Write the complete Message_Block chain to the connection. virtual ssize_t send (TAO_Stub *stub, int two_way, const ACE_Message_Block *mblk, const ACE_Time_Value *s = 0); + virtual ssize_t send (const ACE_Message_Block *mblk, const ACE_Time_Value *s = 0, size_t *bytes_transferred = 0); + + + /// Write the contents of the buffer of length len to the + /// connection. virtual ssize_t send (const u_char *buf, size_t len, const ACE_Time_Value *s = 0); + + /// Read len bytes from into buf. virtual ssize_t recv (char *buf, size_t len, const ACE_Time_Value *s = 0); + + /// Read and process the message from the connection. The processing + /// of the message is done by delegating the work to the underlying + /// messaging object + virtual int read_process_message (ACE_Time_Value *max_time_value = 0, + int block =0); + + virtual int register_handler (void); + + /// @@TODO: These methods IMHO should have more meaningful + /// names. The names seem to indicate nothing. virtual int send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core , + TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int twoway, ACE_Time_Value *max_wait_time); - - virtual CORBA::Boolean - send_request_header (TAO_Operation_Details &opdetails, - TAO_Target_Specification &spec, - TAO_OutputCDR &msg); - - virtual TAO_UIOP_SVC_HANDLER *service_handler (void) = 0; - // Acces the underlying connection handler - -}; - -class TAO_Strategies_Export TAO_UIOP_Client_Transport : public TAO_UIOP_Transport -{ - // = TITLE - // The Transport class used for Client side communication with a - // server. - // - // = DESCRIPTION - // Specialization of the TAO_UIOP_Transport class for client - // side. Methods related to sending one and two way requests - // lives here. -public: - TAO_UIOP_Client_Transport (TAO_UIOP_Client_Connection_Handler *handler, - TAO_ORB_Core *orb_core); - // Constructor. Note, TAO_UIOP_Handler_Base is the base class for - // both TAO_UIOP_Client_Connection_Handler and - // TAO_UIOP_Server_Connection_Handler. - - ~TAO_UIOP_Client_Transport (void); - // destructor - - // = The TAO_Transport methods, please check the documentation in - // "tao/Pluggable.h" for more details. - virtual void close_connection (void); - virtual int idle (void); + virtual int send_message (TAO_OutputCDR &stream, + TAO_Stub *stub = 0, + int twoway = 1, + ACE_Time_Value *max_time_wait = 0); virtual void start_request (TAO_ORB_Core *orb_core, TAO_Target_Specification &spec, @@ -137,91 +128,32 @@ public: CORBA::Environment &ACE_TRY_ENV = TAO_default_environment ()) ACE_THROW_SPEC ((CORBA::SystemException)); - virtual int send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core, - TAO_OutputCDR &stream, - int twoway, - ACE_Time_Value *max_wait_time); - virtual int handle_client_input (int block = 0, - ACE_Time_Value *max_time_value = 0); - virtual int register_handler (void); - // Register the handler with the reactor. This will be called by the - // Wait Strategy if Reactor is used for that strategy. - - TAO_UIOP_SVC_HANDLER *service_handler (void); - // Access the underlying connection handler virtual CORBA::Boolean - send_request_header (TAO_Operation_Details &opdetail, + send_request_header (TAO_Operation_Details &opdetails, TAO_Target_Specification &spec, TAO_OutputCDR &msg); - int messaging_init (CORBA::Octet major, - CORBA::Octet minor); - // Initialising the messaging object - - void use_lite (CORBA::Boolean flag); - // Set the lite flag + /// Initialising the messaging object + virtual int messaging_init (CORBA::Octet major, + CORBA::Octet minor); private: - TAO_UIOP_Client_Connection_Handler *handler_; - // The connection service handler used for accessing lower layer - // communication protocols. - - TAO_Pluggable_Messaging *client_mesg_factory_; - // The message_factor instance specific for this particular - // transport protocol. + /// Process the message that we have read + int process_message (void); - TAO_ORB_Core *orb_core_; - // Our orb Core +private: - CORBA::Boolean lite_flag_; - // We using GIOP lite? + /// The connection service handler used for accessing lower layer + /// communication protocols. + TAO_UIOP_Connection_Handler *connection_handler_; - TAO_Pluggable_Reply_Params params_; - // The reply data that is sent back by the server + /// Our messaging object. + TAO_Pluggable_Messaging *messaging_object_; }; -// **************************************************************** - -class TAO_Strategies_Export TAO_UIOP_Server_Transport : public TAO_UIOP_Transport -{ - // = TITLE - // The Transport class used for server communication with a - // connected client. - // - // = DESCRIPTION - // Specialization of the TAO_UIOP_Transport class for the server side. - // methods for reading messages (requests) and sending replies live - // here. -public: - - TAO_UIOP_Server_Transport (TAO_UIOP_Server_Connection_Handler *handler, - TAO_ORB_Core* orb_core); - // Default creator method. - - ~TAO_UIOP_Server_Transport (void); - // Default destructor - - // See Pluggable.h for documentation - virtual void close_connection (void); - virtual int idle (void); - - TAO_UIOP_SVC_HANDLER *service_handler (void); - // Access the underlying connection handler - - TAO_GIOP_Message_State message_state_; - // This keep the state of the current message, to enable - // non-blocking reads, fragment reassembly, etc. -private: - - TAO_UIOP_Server_Connection_Handler *handler_; - // the connection service handler used for accessing lower layer - // communication protocols. - -}; #if defined (__ACE_INLINE__) #include "UIOP_Transport.i" diff --git a/TAO/tao/Strategies/UIOP_Transport.i b/TAO/tao/Strategies/UIOP_Transport.i index 4557ef511c6..81bf354f364 100644 --- a/TAO/tao/Strategies/UIOP_Transport.i +++ b/TAO/tao/Strategies/UIOP_Transport.i @@ -1,8 +1,2 @@ // -*- C++ -*- //$Id$ - -ACE_INLINE void -TAO_UIOP_Client_Transport::use_lite (CORBA::Boolean flag) -{ - this->lite_flag_ = flag; -} diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index d529170756b..89d809d44e0 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -1240,6 +1240,25 @@ SOURCE=.\GIOP_Message_Headers.cpp # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Lite.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_State.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
diff --git a/TAO/tao/Wait_On_Leader_Follower.cpp b/TAO/tao/Wait_On_Leader_Follower.cpp index 55da4d7ff01..2aad7d8dfe5 100644 --- a/TAO/tao/Wait_On_Leader_Follower.cpp +++ b/TAO/tao/Wait_On_Leader_Follower.cpp @@ -204,7 +204,6 @@ TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, while (1) { - cout << "In the while loop... " <<endl; // Run the event loop. result = reactor->handle_events (max_wait_time); diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp index df078b709d7..bca74215a05 100644 --- a/TAO/tao/Wait_On_Read.cpp +++ b/TAO/tao/Wait_On_Read.cpp @@ -25,7 +25,7 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, while (reply_received != 1) { reply_received = - this->transport_->handle_client_input (1, max_wait_time); + this->transport_->read_process_message (max_wait_time, 1); if (reply_received == -1) return -1; } |