diff options
author | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-23 16:47:59 +0000 |
---|---|---|
committer | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-23 16:47:59 +0000 |
commit | f03470a0da6af5715b281aca5826be848c7a7c2f (patch) | |
tree | a676e62438b1f2d1d4dd00c7492d51e1fe7dce94 /TAO/tao | |
parent | 05ad64bb238e59fddd588743e0412d374e2de787 (diff) | |
download | ATCD-f03470a0da6af5715b281aca5826be848c7a7c2f.tar.gz |
ChangeLogTag: Mon Jul 23 11:44:30 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'TAO/tao')
-rw-r--r-- | TAO/tao/Connection_Handler.h | 2 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 52 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 6 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Generator_Parser_12.cpp | 15 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.cpp | 625 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Lite.h | 104 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 14 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 18 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.inl | 38 | ||||
-rw-r--r-- | TAO/tao/Makefile | 1 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 6 | ||||
-rw-r--r-- | TAO/tao/Strategies/DIOP_Transport.cpp | 22 | ||||
-rw-r--r-- | TAO/tao/Strategies/SHMIOP_Transport.cpp | 5 | ||||
-rw-r--r-- | TAO/tao/Strategies/UIOP_Transport.cpp | 8 | ||||
-rw-r--r-- | TAO/tao/TAO.dsp | 12 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 82 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 6 | ||||
-rw-r--r-- | TAO/tao/Transport.inl | 3 | ||||
-rw-r--r-- | TAO/tao/orbconf.h | 13 |
20 files changed, 709 insertions, 329 deletions
diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h index 9699d5e5414..ae74b4f8ad1 100644 --- a/TAO/tao/Connection_Handler.h +++ b/TAO/tao/Connection_Handler.h @@ -6,7 +6,7 @@ * * $Id$ * - * @author Bala Natarajan <bala@cs.wustl.edu> + * @author Balachandran Natarajan <bala@cs.wustl.edu> */ //============================================================================= diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index c9c4fe10904..3df80faebbc 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "GIOP_Message_Base.h" #include "operation_details.h" #include "GIOP_Utils.h" @@ -18,6 +19,7 @@ ACE_RCSID (tao, GIOP_Message_Base, "$Id$") + TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) : orb_core_ (orb_core), @@ -46,7 +48,7 @@ TAO_GIOP_Message_Base::init (CORBA::Octet major, void -TAO_GIOP_Message_Base::reset (int /* reset_flag */) +TAO_GIOP_Message_Base::reset (void) { // no-op } @@ -254,10 +256,14 @@ TAO_GIOP_Message_Base::message_type ( case TAO_GIOP_CLOSECONNECTION: return TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION; + case TAO_GIOP_FRAGMENT: + return TAO_PLUGGABLE_MESSAGE_FRAGMENT; + case TAO_GIOP_CANCELREQUEST: case TAO_GIOP_MESSAGERROR: - case TAO_GIOP_FRAGMENT: // Never happens: why?? + + default: ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ") @@ -386,8 +392,7 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, // Copy the pay load.. // Calculate the bytes that needs to be copied in the queue... - size_t copy_len = - state.message_size () - TAO_GIOP_MESSAGE_HEADER_LEN; + size_t copy_len = state.payload_size (); // If teh data that needs to be copied is more than that is // available to us .. @@ -449,6 +454,42 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, } +int +TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd) +{ + if (dqd->byte_order_ != sqd->byte_order_ + || dqd->major_version_ != sqd->major_version_ + || dqd->minor_version_ != sqd->minor_version_) + { + // Yes, print it out in all debug levels!. This is an error by + // CORBA 2.4 spec + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) incompatible fragments:") + ACE_TEXT ("different GIOP versions or byte order\n"))); + return -1; + } + + // Skip the header in the incoming message + sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + // If we have a fragment header skip the header length too.. + if (sqd->minor_version_ == 2) + sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); + + // Get the length of the incoming message block.. + int incoming_size = sqd->msg_block_->length (); + + // Increase the size of the destination message block + dqd->msg_block_->size (incoming_size); + + // Copy the data + dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), + incoming_size); + + return 0; +} + void TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) { @@ -460,6 +501,9 @@ TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) qd->minor_version_ = this->message_state_.giop_version_.minor; + qd->more_fragments_ = + this->message_state_.more_fragments_; + qd->msg_type_= this->message_type (this->message_state_); diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 028deebdb5f..6d5337be3e9 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -58,7 +58,7 @@ public: CORBA::Octet minor); /// Reset the messaging the object - virtual void reset (int reset_flag = 1); + virtual void reset (void); /// Write the RequestHeader in to the <cdr> stream. The underlying /// implementation of the mesaging should do the right thing. @@ -114,6 +114,10 @@ public: /// Get the details of the message parsed through the <qd>. virtual void get_message_data (TAO_Queued_Data *qd); + /// @@Bala:Docu?? + virtual int consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd); + /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, diff --git a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp index 4f6f5b054cb..c42fa68ae90 100644 --- a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp +++ b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp @@ -206,6 +206,11 @@ TAO_GIOP_Message_Generator_Parser_12::write_locate_reply_mesg ( // Make the header for the locate request output.write_ulong (status_info.status); + // Note: We dont align the pointer to an 8 byte boundary for a + // locate reply body. This is due to an urgent issue raised by Michi + // in the OMG. I discussed this with Michi today (09/07/2001) and I + // learn that this has been passed. Hence the change.. + /* if (status_info.status == TAO_GIOP_OBJECT_FORWARD || status_info.status == TAO_GIOP_OBJECT_FORWARD_PERM) { @@ -215,7 +220,7 @@ TAO_GIOP_Message_Generator_Parser_12::write_locate_reply_mesg ( return 0; } } - + */ switch (status_info.status) { case TAO_GIOP_OBJECT_FORWARD: @@ -467,11 +472,15 @@ TAO_GIOP_Message_Generator_Parser_12::parse_locate_reply ( return -1; - if (cdr.length () > 0) + // Note: We dont align the pointer to an 8 byte boundary for a + // locate reply body. This is due to an urgent issue raised by Michi + // in the OMG. I discussed this with Michi today (09/07/2001) and I + // learn that this has been passed. Hence the change.. + /*if (cdr.length () > 0) { // Align the read pointer on an 8-byte boundary cdr.align_read_ptr (TAO_GIOP_MESSAGE_ALIGN_PTR); - } + }*/ // Steal the contents in to the reply CDR and loose ownership of the // data block. diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index d89da3c9653..c9b42560240 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -23,60 +23,17 @@ static const size_t TAO_GIOP_LITE_MESSAGE_SIZE_OFFSET = 0; static const size_t TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET = 4; TAO_GIOP_Message_Lite::TAO_GIOP_Message_Lite (TAO_ORB_Core *orb_core, - size_t input_cdr_size) - :message_state_ (orb_core), - output_ (0), - cdr_buffer_alloc_ ( - orb_core->resource_factory ()->output_cdr_buffer_allocator () - ), - cdr_dblock_alloc_ ( - orb_core->resource_factory ()->output_cdr_dblock_allocator () - ), - cdr_msgblock_alloc_ ( - orb_core->resource_factory ()->output_cdr_msgblock_allocator () - ), - input_cdr_ (orb_core->create_input_cdr_data_block (input_cdr_size), - 0, - TAO_ENCAP_BYTE_ORDER, - TAO_DEF_GIOP_MAJOR, - TAO_DEF_GIOP_MINOR, - orb_core), - current_offset_ (0) + size_t /*input_cdr_size*/) + : orb_core_ (orb_core), + message_type_ (0), + message_size_ (0), + byte_order_ (ACE_CDR_BYTE_ORDER) { -#if defined (ACE_HAS_PURIFY) - (void) ACE_OS::memset (this->repbuf_, - '\0', - sizeof this->repbuf_); -#endif /* ACE_HAS_PURIFY */ - ACE_NEW (this->output_, - TAO_OutputCDR (this->repbuf_, - sizeof this->repbuf_, - TAO_ENCAP_BYTE_ORDER, - this->cdr_buffer_alloc_, - this->cdr_dblock_alloc_, - this->cdr_msgblock_alloc_, - orb_core->orb_params ()->cdr_memcpy_tradeoff (), - TAO_DEF_GIOP_MAJOR, - TAO_DEF_GIOP_MINOR, - orb_core->to_iso8859 (), - orb_core->to_unicode ())); } TAO_GIOP_Message_Lite::~TAO_GIOP_Message_Lite (void) { - // Explicitly call the destructor of the output CDR first. They need - // the allocators during destruction. - delete this->output_; - - // Then call the destructor of our allocators - if (this->cdr_dblock_alloc_ != 0) - this->cdr_dblock_alloc_->remove (); - // delete this->cdr_dblock_alloc_; - - if (this->cdr_buffer_alloc_ != 0) - this->cdr_buffer_alloc_->remove (); - // delete this->cdr_buffer_alloc_; } @@ -87,40 +44,11 @@ TAO_GIOP_Message_Lite::init (CORBA::Octet, return; } -int -TAO_GIOP_Message_Lite::parse_header (void) -{ - // Get the read pointer - char *buf = this->input_cdr_.rd_ptr (); - - // @@ Bala: i added the following comment, does it make sense? - // In GIOPLite the version, byte order info, etc. are hardcoded, and - // not transmitted over the wire. - this->message_state_.byte_order = TAO_ENCAP_BYTE_ORDER; - this->message_state_.giop_version.major = TAO_DEF_GIOP_MAJOR; - this->message_state_.giop_version.minor = TAO_DEF_GIOP_MINOR; - - // Get the message type. - this->message_state_.message_type = buf[TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET]; - - this->input_cdr_.reset_byte_order (this->message_state_.byte_order); - - // The first bytes are the length of the message. - this->input_cdr_.read_ulong (this->message_state_.message_size); - - return 0; -} - - void -TAO_GIOP_Message_Lite::reset (int reset_flag) +TAO_GIOP_Message_Lite::reset (void) { - // Reset the message state - this->message_state_.reset (reset_flag); - - if (reset_flag) - this->input_cdr_.reset_contents (); - //What else??? + this->message_type_ = 0; + this->message_size_ = 0; } @@ -222,105 +150,10 @@ TAO_GIOP_Message_Lite::generate_reply_header ( int -TAO_GIOP_Message_Lite::read_message (TAO_Transport *transport, +TAO_GIOP_Message_Lite::read_message (TAO_Transport * /*transport*/, int /*block */, - ACE_Time_Value *max_wait_time) + ACE_Time_Value * /*max_wait_time*/) { - if (this->message_state_.header_received () == 0) - { - int retval = - TAO_GIOP_Utils::read_bytes_input (transport, - this->input_cdr_, - TAO_GIOP_LITE_HEADER_LEN , - max_wait_time); - if (retval == -1) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - \n") - ACE_TEXT ("TAO_GIOP_Message_Lite::read_message \n"))); - } - - return -1; - } - - // Read the rest of the stuff. That should be read by the - // corresponding states - if (this->parse_header () == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N%l) -\n"), - ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input \n"))); - return -1; - } - - if (this->input_cdr_.grow (TAO_GIOP_LITE_HEADER_LEN + - this->message_state_.message_size) == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N|%l) - %p\n"), - ACE_TEXT ("ACE_CDR::grow"))); - return -1; - } - - // Growing the buffer may have reset the rd_ptr(), but we want - // to leave it just after the GIOP header (that was parsed - // already); - this->input_cdr_.skip_bytes (TAO_GIOP_LITE_HEADER_LEN); - } - - size_t missing_data = - this->message_state_.message_size - this->current_offset_; - - ssize_t n = - TAO_GIOP_Utils::read_buffer (transport, - this->input_cdr_.rd_ptr () - + this->current_offset_, - missing_data, - max_wait_time); - - if (n == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, read_buffer[1] \n"))); - return -1; - } - else if (n == 0) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, read_buffer[2]\n"))); - return -1; - } - - this->current_offset_ += n; - - if (this->current_offset_ == - this->message_state_.message_size) - { - if (TAO_debug_level >= 4) - { - size_t header_len = TAO_GIOP_LITE_HEADER_LEN ; - - char *buf = this->input_cdr_.rd_ptr (); - buf -= header_len; - size_t msg_len = this->input_cdr_.length () + header_len; - this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, - buf), - msg_len); - } - } - - if (this->current_offset_ != this->message_state_.message_size) - return 0; - return 1; } @@ -394,16 +227,46 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream) } +int +TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block) +{ + // Get the read pointer + char *buf = block.rd_ptr (); + + CORBA::ULong x = 0; +#if !defined (ACE_DISABLE_SWAP_ON_READ) + if (!(this->byte_order_ != ACE_CDR_BYTE_ORDER)) + { + x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); + } + else + { + ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x)); + } +#else + x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); +#endif /* ACE_DISABLE_SWAP_ON_READ */ + + this->message_size_ = x; + + // Get the message type. + this->message_type_ = buf[TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET]; + + return 0; +} + TAO_Pluggable_Message_Type TAO_GIOP_Message_Lite::message_type (void) { - switch (this->message_state_.message_type) + switch (this->message_type_) { case TAO_GIOP_REQUEST: - case TAO_GIOP_LOCATEREQUEST: return TAO_PLUGGABLE_MESSAGE_REQUEST; + case TAO_GIOP_LOCATEREQUEST: + return TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST; case TAO_GIOP_LOCATEREPLY: + return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY; case TAO_GIOP_REPLY: return TAO_PLUGGABLE_MESSAGE_REPLY; @@ -425,54 +288,265 @@ TAO_GIOP_Message_Lite::message_type (void) } +ssize_t +TAO_GIOP_Message_Lite::missing_data (ACE_Message_Block &block) +{ + // Actual message size including the header.. + CORBA::ULong msg_size = + this->message_size_ + TAO_GIOP_LITE_HEADER_LEN; + + size_t len = block.length (); + + if (len > msg_size) + { + return -1; + } + else if (len == msg_size) + return 0; + + return msg_size - len; +} + + int -TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core) +TAO_GIOP_Message_Lite::extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd) { - // Set the upcall thread - orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ()); + if (incoming.length () < TAO_GIOP_LITE_HEADER_LEN) + { + if (incoming.length () > 0) + { + // Make a node which has a message block of the size of + // MESSAGE_HEADER_LEN. + qd = + this->make_queued_data (TAO_GIOP_LITE_HEADER_LEN); + + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + qd->missing_data_ = -1; + } + return 0; + } - // Reset the output CDR stream. - // @@@@Is it necessary here? - this->output_->reset (); + if (this->parse_incoming_messages (incoming) == -1) + { + return -1; + } - // - // Take out all the information from the <message_state> and reset - // it so that nested upcall on the same transport can be handled. - // + size_t copying_len = + this->message_size_ + TAO_GIOP_LITE_HEADER_LEN; - // Notice that the message_state is only modified in one thread at a - // time because the reactor does not call handle_input() for the - // same Event_Handler in two threads at the same time. + qd = this->make_queued_data (copying_len); - // Steal the input CDR from the message state. - TAO_InputCDR input_cdr (ACE_InputCDR::Transfer_Contents (this->input_cdr_), - orb_core); + if (copying_len > incoming.length ()) + { + qd->missing_data_ = + copying_len - incoming.length (); - // Send the message state for the service layer like FT to log the - // messages - // @@@ Needed for DOORS - // orb_core->services_log_msg_rcv (this->message_state_); + copying_len = incoming.length (); + } + + qd->msg_block_->copy (incoming.rd_ptr (), + copying_len); + + incoming.rd_ptr (copying_len); + qd->byte_order_ = TAO_ENCAP_BYTE_ORDER; + qd->major_version_ = TAO_DEF_GIOP_MAJOR; + qd->minor_version_ = TAO_DEF_GIOP_MINOR; + qd->msg_type_ = this->message_type (); + return 1; +} + +int +TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) +{ + // Look to see whether we had atleast parsed the GIOP header ... + if (qd->missing_data_ == -1) + { + // The data length that has been stuck in there during the last + // read .... + size_t len = + qd->msg_block_->length (); + + // We know that we would have space for + // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data + // from the <incoming> into the message block in <qd> + qd->msg_block_->copy (incoming.rd_ptr (), + TAO_GIOP_LITE_HEADER_LEN - len); + + // Move the rd_ptr () in the incoming message block.. + incoming.rd_ptr (TAO_GIOP_LITE_HEADER_LEN - len); + + // Parse the message header now... + if (this->parse_incoming_messages (*qd->msg_block_) == -1) + return -1; + + // Now grow the message block so that we can copy the rest of + // the data... + ACE_CDR::grow (qd->msg_block_, + this->message_size_ + TAO_GIOP_LITE_HEADER_LEN); + + // Copy the pay load.. + + // Calculate the bytes that needs to be copied in the queue... + size_t copy_len = this->message_size_; + + // If teh data that needs to be copied is more than that is + // available to us .. + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + else + { + qd->missing_data_ = 0; + } + + // ..now we are set to copy the right amount of data to the + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + incoming.rd_ptr (copy_len); + + // Get the other details... + qd->byte_order_ = TAO_ENCAP_BYTE_ORDER; + qd->major_version_ = TAO_DEF_GIOP_MAJOR; + qd->minor_version_ = TAO_DEF_GIOP_MINOR; + qd->msg_type_ = this->message_type (); + } + else + { + // @@todo: Need to abstract this out to a seperate method... + size_t copy_len = qd->missing_data_; + + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + + // Copy the right amount of data in to the node... + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + qd->msg_block_->rd_ptr (copy_len); + + } + + return 0; +} + + +void +TAO_GIOP_Message_Lite::get_message_data (TAO_Queued_Data *qd) +{ + // Get the message information + qd->byte_order_ = + ACE_CDR_BYTE_ORDER; + qd->major_version_ = + TAO_DEF_GIOP_MAJOR; + qd->minor_version_ = + TAO_DEF_GIOP_MINOR; + + qd->msg_type_= + this->message_type (); + + this->reset (); +} + +int +TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/, + const TAO_Queued_Data */*sqd*/) +{ + // We dont know what fragments are??? + return -1; +} + +int +TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, + TAO_Queued_Data *qd) +{ + // Set the upcall thread + this->orb_core_->lf_strategy ().set_upcall_thread ( + this->orb_core_->leader_follower ()); + + // A buffer that we will use to initialise the CDR stream + char repbuf[ACE_CDR::DEFAULT_BUFSIZE]; + +#if defined(ACE_HAS_PURIFY) + (void) ACE_OS::memset (repbuf, + '\0', + sizeof repbuf); +#endif /* ACE_HAS_PURIFY */ + + // Initialze an output CDR on the stack + TAO_OutputCDR output (repbuf, + sizeof repbuf, + TAO_ENCAP_BYTE_ORDER, + this->orb_core_->output_cdr_buffer_allocator (), + this->orb_core_->output_cdr_dblock_allocator (), + this->orb_core_->output_cdr_msgblock_allocator (), + this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), + qd->major_version_, + qd->minor_version_, + this->orb_core_->to_iso8859 (), + this->orb_core_->to_unicode ()); + + // Get the read and write positions before we steal data. + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_LITE_HEADER_LEN; + + this->dump_msg ("recv", + ACE_reinterpret_cast (u_char *, + qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); + + + // Create a input CDR stream. + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. The same is also done in the higher layers. + + TAO_InputCDR input_cdr (qd->msg_block_->data_block (), + ACE_Message_Block::DONT_DELETE, + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); - // Reset the message state. Now, we are ready for the next nested - // upcall if any. - this->message_state_.reset (0); // We know we have some request message. Check whether it is a // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. - switch (this->message_state_.message_type) + switch (qd->msg_type_) { - case TAO_GIOP_REQUEST: + case TAO_PLUGGABLE_MESSAGE_REQUEST: // Should be taken care by the state specific invocations. They // could raise an exception or write things in the output CDR // stream return this->process_request (transport, - orb_core, - input_cdr); - case TAO_GIOP_LOCATEREQUEST: + input_cdr, + output); + case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST: return this->process_locate_request (transport, - orb_core, - input_cdr); + input_cdr, + output); default: return -1; } @@ -480,22 +554,59 @@ TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, int TAO_GIOP_Message_Lite::process_reply_message ( - TAO_Pluggable_Reply_Params ¶ms, - TAO_Queued_Data * /* qd */ + TAO_Pluggable_Reply_Params ¶ms, + TAO_Queued_Data *qd ) { + + + // Get the read and write positions before we steal data. + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_LITE_HEADER_LEN; + + this->dump_msg ("recv", + ACE_reinterpret_cast (u_char *, + qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); + + + // Create a empty buffer on stack + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. The same is alos done in the higher layers. + TAO_InputCDR input_cdr (qd->msg_block_->data_block (), + ACE_Message_Block::DONT_DELETE, + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); + + // Reset the message state. Now, we are ready for the next nested + // upcall if any. + // this->message_handler_.reset (0); + + // We know we have some reply message. Check whether it is a + // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. + + // Once we send the InputCDR stream we need to just forget about + // the stream and never touch that again for anything. We basically + // loose ownership of the data_block. + // We know we have some reply message. Check whether it is a // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. - switch (this->message_state_.message_type) + switch (qd->msg_type_) { case TAO_GIOP_REPLY: // Should be taken care by the state specific parsing - return this->parse_reply (this->input_cdr_, + return this->parse_reply (input_cdr, params); case TAO_GIOP_LOCATEREPLY: // We call parse_reply () here because, the message format for // the LOCATEREPLY & REPLY are same. - return this->parse_reply (this->input_cdr_, + return this->parse_reply (input_cdr, params); default: return -1; @@ -569,16 +680,16 @@ TAO_GIOP_Message_Lite::write_protocol_header ( int TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &cdr) + TAO_InputCDR &cdr, + TAO_OutputCDR &output) { // This will extract the request header, set <response_required> // and <sync_with_server> as appropriate. TAO_ServerRequest request (this, cdr, - *this->output_, + output, transport, - orb_core); + this->orb_core_); CORBA::Environment &ACE_TRY_ENV = TAO_default_environment (); @@ -603,16 +714,16 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, CORBA::Object_var forward_to; // Do this before the reply is sent. - orb_core->adapter_registry ()->dispatch (request.object_key (), - request, - forward_to, - ACE_TRY_ENV); + this->orb_core_->adapter_registry ()->dispatch (request.object_key (), + request, + forward_to, + ACE_TRY_ENV); ACE_TRY_CHECK; if (!CORBA::is_nil (forward_to.in ())) { // We should forward to another object... - TAO_Pluggable_Reply_Params reply_params (orb_core); + TAO_Pluggable_Reply_Params reply_params (this->orb_core_); reply_params.request_id_ = request_id; reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD; reply_params.svc_ctx_.length (0); @@ -621,12 +732,12 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, reply_params.service_context_notowned (&request.reply_service_info ()); // Make the GIOP header and Reply header - this->generate_reply_header (*this->output_, + this->generate_reply_header (output, reply_params); - *this->output_ << forward_to.in (); + output << forward_to.in (); - int result = transport->send_message (*this->output_); + int result = transport->send_message (output); if (result == -1) { if (TAO_debug_level > 0) @@ -650,7 +761,7 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, if (response_required) { result = this->send_reply_exception (transport, - orb_core, + this->orb_core_, request_id, &request.reply_service_info (), &ACE_ANY_EXCEPTION); @@ -709,7 +820,7 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, ); result = this->send_reply_exception (transport, - orb_core, + this->orb_core_, request_id, &request.reply_service_info (), &exception); @@ -748,13 +859,13 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, int TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport, - TAO_ORB_Core* orb_core, - TAO_InputCDR &input) + TAO_InputCDR &input, + TAO_OutputCDR &output) { // This will extract the request header, set <response_required> as // appropriate. TAO_GIOP_Locate_Request_Header locate_request (input, - orb_core); + this->orb_core_); TAO_GIOP_Locate_Status_Msg status_info; @@ -802,7 +913,7 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport, "_non_existent", dummy_output, transport, - orb_core, + this->orb_core_, parse_error); if (parse_error != 0) @@ -813,10 +924,10 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport, CORBA::Object_var forward_to; - orb_core->adapter_registry ()->dispatch (server_request.object_key (), - server_request, - forward_to, - ACE_TRY_ENV); + this->orb_core_->adapter_registry ()->dispatch (server_request.object_key (), + server_request, + forward_to, + ACE_TRY_ENV); ACE_TRY_CHECK; if (!CORBA::is_nil (forward_to.in ())) @@ -877,6 +988,7 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport, ACE_ENDTRY; return this->make_send_locate_reply (transport, + output, locate_request, status_info); } @@ -885,6 +997,7 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport, int TAO_GIOP_Message_Lite::make_send_locate_reply ( TAO_Transport *transport, + TAO_OutputCDR &output, TAO_GIOP_Locate_Request_Header &request, TAO_GIOP_Locate_Status_Msg &status_info ) @@ -893,15 +1006,15 @@ TAO_GIOP_Message_Lite::make_send_locate_reply ( // different from the reply header made by the make_reply () call.. // Make the GIOP message header this->write_protocol_header (TAO_GIOP_LOCATEREPLY, - *this->output_); + output); // This writes the header & body - this->write_locate_reply_mesg (*this->output_, + this->write_locate_reply_mesg (output, request.request_id (), status_info); // Send the message - int result = transport->send_message (*this->output_); + int result = transport->send_message (output); // Print out message if there is an error if (result == -1) @@ -1454,6 +1567,38 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label, } } +TAO_Queued_Data * +TAO_GIOP_Message_Lite::make_queued_data (size_t sz) +{ + // Get a node for the queue.. + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data (); + + // Make a datablock for the size requested + something. The + // "something" is required because we are going to align the data + // block in the message block. During alignment we could loose some + // bytes. As we may not know how many bytes will be lost, we will + // allocate ACE_CDR::MAX_ALIGNMENT extra. + ACE_Data_Block *db = + this->orb_core_->data_block_for_message_block (sz + + ACE_CDR::MAX_ALIGNMENT); + + ACE_Allocator *alloc = + this->orb_core_->message_block_msgblock_allocator (); + + ACE_Message_Block mb (db, + 0, + alloc); + + ACE_Message_Block *new_mb = mb.duplicate (); + + ACE_CDR::mb_align (new_mb); + + qd->msg_block_ = new_mb; + + return qd; +} + int TAO_GIOP_Message_Lite::generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h index 6086a4b11f3..8bd5202ca70 100644 --- a/TAO/tao/GIOP_Message_Lite.h +++ b/TAO/tao/GIOP_Message_Lite.h @@ -28,6 +28,7 @@ class TAO_Operation_Details; class TAO_Pluggable_Reply_Params; class TAO_GIOP_Locate_Request_Header; +class TAO_Queued_Data; /** * @class TAO_GIOP_Message_Lite @@ -53,7 +54,7 @@ public: virtual void init (CORBA::Octet, CORBA::Octet); /// Reset the messaging the object - virtual void reset (int reset_flag = 1); + virtual void reset (void); /// Write the RequestHeader in to the <cdr> stream. The underlying /// implementation of the mesaging should do the right thing. @@ -73,13 +74,7 @@ public: TAO_Pluggable_Reply_Params_Base ¶ms ); - /// This method reads the message on the connection. Returns 0 when - /// there is short read on the connection. Returns 1 when the full - /// message is read and handled. Returns -1 on errors. If <block> is - /// 1, then reply is read in a blocking manner. <bytes> indicates the - /// number of bytes that needs to be read from the connection. - /// GIOP uses this read to unmarshall the message details that appear - /// on the connection. + /// Dummy method to .. virtual int read_message (TAO_Transport *transport, int block = 0, ACE_Time_Value *max_wait_time = 0); @@ -90,6 +85,8 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); + /// Parse the incoming messages.. + virtual int parse_incoming_messages (ACE_Message_Block &message_block); /// Get the message type. The return value would be one of the /// following: @@ -97,14 +94,35 @@ public: /// TAO_PLUGGABLE_MESSAGE_REPLY, /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. - virtual TAO_Pluggable_Message_Type message_type (void); + TAO_Pluggable_Message_Type message_type (void); + /// Calculate the amount of data that is missing in the <incoming> + /// message block. + virtual ssize_t missing_data (ACE_Message_Block &message_block); + + /* Extract the details of the next message from the <incoming> + * through <qd>. Returns 1 if there are more messages and returns a + * 0 if there are no more messages in <incoming>. + */ + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd); + + /// Check whether the node <qd> needs consolidation from <incoming> + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming); + + /// Get the details of the message parsed through the <qd>. + virtual void get_message_data (TAO_Queued_Data *qd); + + /// @@Bala: Docu??? + virtual int consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd); /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core); + TAO_Queued_Data *qd); /// Parse the reply message that we received and return the reply /// information though <reply_info> @@ -129,41 +147,26 @@ private: /// Processes the <GIOP_REQUEST> messages int process_request (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &input); + TAO_InputCDR &input, + TAO_OutputCDR &output); /// Processes the <GIOP_LOCATE_REQUEST> messages int process_locate_request (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &input); + TAO_InputCDR &input, + TAO_OutputCDR &output); /// Make a <GIOP_LOCATEREPLY> and hand that over to the transport so /// that it can be sent over the connection. /// NOTE:As on date 1.1 & 1.2 seem to have similar headers. Till an /// unmanageable difference comes let them be implemented here. int make_send_locate_reply (TAO_Transport *transport, + TAO_OutputCDR &output, TAO_GIOP_Locate_Request_Header &request, TAO_GIOP_Locate_Status_Msg &status); /// Send error messages int send_error (TAO_Transport *transport); - /// Parses the header of the GIOP messages for validity - int parse_header (void); - - /// Validates the first 4 bytes that contain the magic word - /// "GIOP". Also calls the validate_version () on the incoming - /// stream. - int parse_magic_bytes (void); - - /// This will do a validation of the stream that arrive in the - /// transport. - int validate_version (void); - - /// Set the state - void set_state (CORBA::Octet major, - CORBA::Octet minor); - /// Close a connection, first sending GIOP::CloseConnection. void send_close_connection (const TAO_GIOP_Message_Version &version, TAO_Transport *transport, @@ -182,6 +185,8 @@ private: const u_char *ptr, size_t len); + TAO_Queued_Data *make_queued_data (size_t sz); + /// Write the locate reply header virtual int generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, @@ -238,33 +243,18 @@ private: private: - /// The message state. It represents the status of the messages that - /// have been read from the connection. - TAO_GIOP_Message_State message_state_; - - /// Output CDR - TAO_OutputCDR *output_; - - /// Allocators for the output CDR that we hold. As we cannot rely on - /// the resources from ORB Core we reserve our own resources. The - /// reason that we cannot believe the ORB core is that, for a - /// multi-threaded servers it dishes out resources cached in - /// TSS. This would be dangerous as TSS gets destroyed before we - /// would. So we have our own memory that we can rely on. - /// Implementations of GIOP that we have - ACE_Allocator *cdr_buffer_alloc_; - ACE_Allocator *cdr_dblock_alloc_; - ACE_Allocator *cdr_msgblock_alloc_; - - /// A buffer that we will use to initialise the CDR stream - char repbuf_[ACE_CDR::DEFAULT_BUFSIZE]; - - /// The InputCDR stream in which the incoming messages are - /// read. This will be used to decode the messages. - TAO_InputCDR input_cdr_; - - /// The offset of the write pointer of the input cdr stream - size_t current_offset_; + /// Our copy of the ORB core... + TAO_ORB_Core *orb_core_; + + /// The message type that we are going to process.. + CORBA::Octet message_type_; + + /// The pay load size + CORBA::ULong message_size_; + + // The byte order.. + // NOTE: GIOP lite cannot work between heterogenous platforms.. + CORBA::Octet byte_order_; }; diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index d3c664db9af..85e5f4b1d84 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -16,7 +16,7 @@ #include "tao/ORB_Core.h" #include "tao/debug.h" #include "tao/GIOP_Message_Base.h" -//#include "tao/GIOP_Message_Lite.h" +#include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "tao/IIOP_Transport.i" @@ -26,13 +26,12 @@ ACE_RCSID (tao, IIOP_Transport, "$Id$") TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean /*flag*/) + CORBA::Boolean flag) : TAO_Transport (TAO_TAG_IIOP_PROFILE, orb_core) , connection_handler_ (handler) , messaging_object_ (0) { -#if 0 if (flag) { // Use the lite version of the protocol @@ -40,7 +39,6 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, TAO_GIOP_Message_Lite (orb_core)); } else -#endif { // Use the normal GIOP object ACE_NEW (this->messaging_object_, diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 17f8b31d6b9..737e329ea7b 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -134,6 +134,7 @@ TAO_Queued_Data::TAO_Queued_Data (void) byte_order_ (0), major_version_ (0), minor_version_ (0), + more_fragments_ (0), msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), next_ (0) { @@ -145,7 +146,20 @@ TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb) byte_order_ (0), major_version_ (0), minor_version_ (0), + more_fragments_ (0), msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), next_ (0) { } + +TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd) + : msg_block_ (qd.msg_block_->duplicate ()), + missing_data_ (qd.missing_data_), + byte_order_ (qd.byte_order_), + major_version_ (qd.major_version_), + minor_version_ (qd.minor_version_), + more_fragments_ (qd.more_fragments_), + msg_type_ (qd.msg_type_), + next_ (0) +{ +} diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index 20fbdc17d04..6037ca825af 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -78,6 +78,10 @@ public: int is_tail_complete (void); int is_head_complete (void); + /// This method checks whether the last message that was queued up + /// was fragmented... + int is_tail_fragmented (void); + /// Return the size of data that is missing in tail of the queue. size_t missing_data_tail (void) const; /// void missing_data (size_t data); @@ -123,11 +127,18 @@ public: /// Constructor. TAO_Queued_Data (ACE_Message_Block *mb); + /// Copy constructor. + TAO_Queued_Data (const TAO_Queued_Data &qd); + /// Creation and deletion of a node in the queue. static TAO_Queued_Data* get_queued_data (void); static void release (TAO_Queued_Data *qd); + /// Duplicate ourselves. This creates a copy of ourselves on the + /// heap and returns a pointer to the duplicated node. + static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd); + /// The message block that contains the message. ACE_Message_Block *msg_block_; @@ -142,9 +153,14 @@ public: /// information that would be needed to read and decipher the /// message. CORBA::Octet major_version_; - CORBA::Octet minor_version_; + /// Some messages can be fragmented by the protocol (this is an ORB + /// level fragmentation on top of the TCP/IP fragmentation. This + /// member indicates whether the message that we have recd. and + /// queue already has more fragments that is missing.. + CORBA::Octet more_fragments_; + /// The message type of the message TAO_Pluggable_Message_Type msg_type_; diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index 13ee962af2f..df61432d461 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -3,6 +3,7 @@ /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ +/*static*/ ACE_INLINE TAO_Queued_Data * TAO_Queued_Data::get_queued_data (void) { @@ -15,6 +16,7 @@ TAO_Queued_Data::get_queued_data (void) return qd; } +/*static*/ ACE_INLINE void TAO_Queued_Data::release (TAO_Queued_Data *qd) { @@ -24,6 +26,23 @@ TAO_Queued_Data::release (TAO_Queued_Data *qd) delete qd; } +/*static*/ +ACE_INLINE TAO_Queued_Data * +TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd) +{ + // @@TODO: Use the global pool for allocationg... + TAO_Queued_Data *qd = 0; + ACE_NEW_RETURN (qd, + TAO_Queued_Data (sqd), + 0); + + return qd; +} + +/************************************************************************/ +// Methods for TAO_Incoming_Message_Queue +/************************************************************************/ + ACE_INLINE CORBA::ULong TAO_Incoming_Message_Queue::queue_length (void) { @@ -38,7 +57,8 @@ TAO_Incoming_Message_Queue::is_tail_complete (void) return -1; if (this->size_ && - this->queued_data_->missing_data_ == 0) + this->queued_data_->missing_data_ == 0 && + this->queued_data_->more_fragments_ == 0) return 1; return 0; @@ -51,7 +71,21 @@ TAO_Incoming_Message_Queue::is_head_complete (void) return -1; if (this->size_ && - this->queued_data_->next_->missing_data_ == 0) + this->queued_data_->next_->missing_data_ == 0 && + this->queued_data_->more_fragments_ == 0) + return 1; + + return 0; +} + +ACE_INLINE int +TAO_Incoming_Message_Queue::is_tail_fragmented (void) +{ + if (this->size_ == 0) + return 0; + + if (this->size_ && + this->queued_data_->more_fragments_ == 0) return 1; return 0; diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile index 61d72f65cf9..9b9dd8ea637 100644 --- a/TAO/tao/Makefile +++ b/TAO/tao/Makefile @@ -108,6 +108,7 @@ PLUGGABLE_MESSAGING_FILES = \ Pluggable_Messaging \ Pluggable_Messaging_Utils \ GIOP_Message_Base \ + GIOP_Message_Lite \ GIOP_Message_Generator_Parser \ GIOP_Message_Generator_Parser_10 \ GIOP_Message_Generator_Parser_11 \ diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index db21cc55df8..dd7ff672518 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -134,6 +134,10 @@ public: virtual int consolidate_node (TAO_Queued_Data *qd, ACE_Message_Block &incoming) = 0; + /// @@Bala:Docu?? + virtual int consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd) = 0; + /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, @@ -158,7 +162,7 @@ public: virtual int is_ready_for_bidirectional (void) = 0; /// Reset the messaging the object - virtual void reset (int reset_flag = 1) = 0; + virtual void reset (void) = 0; }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index 8ed6aa9b4a6..eef853367fb 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -20,7 +20,7 @@ #include "tao/debug.h" #include "tao/Resume_Handle.h" #include "tao/GIOP_Message_Base.h" -// #include "tao/GIOP_Message_Lite.h" +#include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "DIOP_Transport.i" @@ -30,7 +30,7 @@ ACE_RCSID (tao, DIOP_Transport, "$Id$") TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean /*flag*/) + CORBA::Boolean flag) : TAO_Transport (TAO_TAG_UDP_PROFILE, orb_core) , connection_handler_ (handler) @@ -38,10 +38,20 @@ TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler, { // @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that // we read the whole UDP packet on a single read. - - ACE_NEW (this->messaging_object_, - TAO_GIOP_Message_Base (orb_core, - ACE_MAX_DGRAM_SIZE)); + if (flag) + { + // Use the lite version of the protocol + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Lite (orb_core, + ACE_MAX_DGRAM_SIZE)); + } + else + { + // Use the normal GIOP object + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Base (orb_core, + ACE_MAX_DGRAM_SIZE)); + } } TAO_DIOP_Transport::~TAO_DIOP_Transport (void) diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index 0b333e0c566..6284a64ad71 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -17,6 +17,7 @@ #include "tao/debug.h" #include "tao/Resume_Handle.h" #include "tao/GIOP_Message_Base.h" +#include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) @@ -27,13 +28,12 @@ ACE_RCSID (Strategies, SHMIOP_Transport, "$Id$") TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean /*flag*/) + CORBA::Boolean flag) : TAO_Transport (TAO_TAG_SHMEM_PROFILE, orb_core), connection_handler_ (handler), messaging_object_ (0) { -#if 0 if (flag) { // Use the lite version of the protocol @@ -41,7 +41,6 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl TAO_GIOP_Message_Lite (orb_core)); } else -#endif /*#if 0 */ { // Use the normal GIOP object ACE_NEW (this->messaging_object_, diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index 5577150de4a..2baf994c1ac 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -16,7 +16,7 @@ #include "tao/ORB_Core.h" #include "tao/debug.h" #include "tao/GIOP_Message_Base.h" -//#include "tao/GIOP_Message_Lite.h" +#include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "UIOP_Transport.i" @@ -27,19 +27,19 @@ ACE_RCSID (Strategies, UIOP_Transport, "$Id$") TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean /*flag*/) + CORBA::Boolean flag) : TAO_Transport (TAO_TAG_UIOP_PROFILE, orb_core) , connection_handler_ (handler) , messaging_object_ (0) { - /* if (flag) + if (flag) { // Use the lite version of the protocol ACE_NEW (this->messaging_object_, TAO_GIOP_Message_Lite (orb_core)); } - else*/ + else { // Use the normal GIOP object ACE_NEW (this->messaging_object_, diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index 62572777b23..b319103ab57 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -411,6 +411,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.cpp # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Lite.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Locate_Header.cpp
# End Source File
# Begin Source File
@@ -1143,6 +1147,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.h # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Lite.h
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Locate_Header.h
# End Source File
# Begin Source File
@@ -1859,6 +1867,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.inl # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Lite.i
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Locate_Header.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index a91348edf7a..09212a06a5b 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -2,6 +2,7 @@ // $Id$ + #include "Transport.h" #include "Exception.h" @@ -888,6 +889,23 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, // Extract the data for the node.. this->messaging_object ()->get_message_data (&qd); + // Check whether the message was fragmented.. + if (qd.more_fragments_ || + (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Make a copy of the message that we have + ACE_Data_Block *ndb = + message_block.data_block ()->clone (); + + // Replace the underlying the datablock + message_block.replace_data_block (ndb); + + // Duplicate the node that we have as the node is on stack.. + TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd); + + return this->consolidate_fragments (nqd, rh); + } + // Resume before starting to process the request.. rh.resume_handle (); @@ -908,6 +926,7 @@ TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, // Check whether we have a complete message for processing ssize_t missing_data = this->missing_data (block); + if (missing_data < 0) { // If we have more than one message @@ -1070,6 +1089,17 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, pqd.missing_data_ = missing_data; this->messaging_object ()->get_message_data (&pqd); + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (pqd.more_fragments_ || + (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Duplicate the queued data as it is on stack.. + TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); + + return this->consolidate_fragments (nqd, rh); + } + // Resume the handle before processing the request rh.resume_handle (); @@ -1079,6 +1109,45 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, } int +TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh) +{ + // If we have received a fragment message then we have to + // consolidate <qd> with the last message in queue + // @@todo: this piece of logic follows GIOP a bit... Need to revisit + // if we have protocols other than GIOP + + // @@todo: Fragments now have too much copying overhead. Need to get + // them out if we want to have some reasonable performance metrics + // in future.. Post 1.2 seems a nice time.. + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + TAO_Queued_Data *tqd = + this->incoming_message_queue_.dequeue_tail (); + + tqd->more_fragments_ = qd->more_fragments_; + + if (this->messaging_object ()->consolidate_fragments (tqd, + qd) == -1) + return -1; + + TAO_Queued_Data::release (qd); + + this->incoming_message_queue_.enqueue_tail (tqd); + + this->process_queue_head (rh); + } + else + { + // if we dont have a fragment already in the queue just add it in + // the queue + this->incoming_message_queue_.enqueue_tail (qd); + } + + return 0; +} + +int TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, ssize_t missing_data, TAO_Resume_Handle &rh, @@ -1241,7 +1310,18 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block this->messaging_object ()->extract_next_message (incoming, q_data); if (q_data) - this->incoming_message_queue_.enqueue_tail (q_data); + { + // If we have read a framented message then... + if (q_data->more_fragments_ || + q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + this->consolidate_fragments (q_data, rh); + } + else + { + this->incoming_message_queue_.enqueue_tail (q_data); + } + } } // In case of error return.. diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 009ce9025ed..6405bbd50f8 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -636,6 +636,12 @@ protected: TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time); + /// @@Bala: Docu??? + int consolidate_fragments (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh); + + + /// First consolidate the message queue. If the message is still not /// complete, try to read from the handle again to make it /// complete. If these dont help put the message back in the queue diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl index b53fc80a7d7..06411ff3179 100644 --- a/TAO/tao/Transport.inl +++ b/TAO/tao/Transport.inl @@ -1,3 +1,4 @@ +// -*- C++ -*- // $Id$ ACE_INLINE CORBA::ULong @@ -83,7 +84,7 @@ TAO_Transport::purging_order (void) const { return this->purging_order_; } - + ACE_INLINE void TAO_Transport::purging_order (unsigned long value) { diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h index 12336939a76..52d26fc85f9 100644 --- a/TAO/tao/orbconf.h +++ b/TAO/tao/orbconf.h @@ -279,6 +279,11 @@ // Define if your processor does not store words with the most significant // byte first. + +// @todo: It seems to be that this definition of TAO_ENCAP_BYTE_ORDER +// should be removed. We have an equivalent ACE definition in +// ACE_CDR_BYTE_ORDER. Today both of them are consistent. It would be +// a havoc if oneday this consistency is gone.. #if defined (ACE_LITTLE_ENDIAN) # define TAO_ENCAP_BYTE_ORDER 1 /* little endian encapsulation byte order has the value = 1 */ @@ -928,6 +933,14 @@ enum TAO_Policy_Scope #define TAO_DEF_GIOP_MINOR 2 #endif /* TAO_DEF_GIOP_MINOR */ +#if !defined (TAO_CONNECTION_HANDLER_STACK_BUF_SIZE) +# define TAO_CONNECTION_HANDLER_STACK_BUF_SIZE 1024 +#endif /*TAO_CONNECTION_HANDLER_STACK_BUF_SIZE */ + +#if !defined (TAO_RESUMES_CONNECTION_HANDLER) +# define TAO_RESUMES_CONNECTION_HANDLER 1 +#endif /*TAO_RESUMES_CONNECTION_HANDLER*/ + // By default TAO generate the OMG standard profile components // (ORB_TYPE and CODE_SETS) #define TAO_STD_PROFILE_COMPONENTS |