From a7e10f0b431dc0e2006196751782cd8fa40b16c7 Mon Sep 17 00:00:00 2001 From: bala Date: Sun, 14 Dec 2003 16:03:49 +0000 Subject: ChangeLogTag:Sun Dec 14 09:56:37 2003 Balachandran Natarajan --- TAO/ChangeLog | 29 + TAO/tao/GIOP_Message_Base.cpp | 353 ++++++- TAO/tao/GIOP_Message_Base.h | 52 +- TAO/tao/GIOP_Message_Base.i | 8 - TAO/tao/GIOP_Message_Generator_Parser_Impl.inl | 23 +- TAO/tao/GIOP_Message_Lite.cpp | 57 +- TAO/tao/GIOP_Message_Lite.h | 45 +- TAO/tao/GIOP_Message_State.cpp | 239 ++--- TAO/tao/GIOP_Message_State.h | 44 +- TAO/tao/GIOP_Message_State.inl | 31 +- TAO/tao/IIOP_Transport.cpp | 2 +- TAO/tao/Incoming_Message_Queue.cpp | 459 ++------- TAO/tao/Incoming_Message_Queue.h | 162 +--- TAO/tao/Incoming_Message_Queue.inl | 23 +- TAO/tao/Pluggable_Messaging.h | 44 +- TAO/tao/Strategies/DIOP_Transport.cpp | 229 +---- TAO/tao/Strategies/SHMIOP_Transport.cpp | 2 - TAO/tao/Strategies/SHMIOP_Transport.h | 4 - TAO/tao/Transport.cpp | 1208 ++++++++++++------------ TAO/tao/Transport.h | 104 +- 20 files changed, 1357 insertions(+), 1761 deletions(-) diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 3cc88f2f838..1a42e91533a 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,32 @@ +Sun Dec 14 09:56:37 2003 Balachandran Natarajan + + * tao/GIOP_Message_Base.cpp: + * tao/GIOP_Message_Base.h: + * tao/GIOP_Message_Base.i: + * tao/GIOP_Message_Generator_Parser_Impl.inl: + * tao/GIOP_Message_Lite.cpp: + * tao/GIOP_Message_Lite.h: + * tao/GIOP_Message_State.cpp: + * tao/GIOP_Message_State.h: + * tao/GIOP_Message_State.inl: + * tao/IIOP_Transport.cpp: + * tao/Incoming_Message_Queue.cpp: + * tao/Incoming_Message_Queue.h: + * tao/Incoming_Message_Queue.inl: + * tao/Pluggable_Messaging.h: + * tao/Transport.cpp: + * tao/Transport.h: + * tao/Strategies/DIOP_Transport.cpp: + * tao/Strategies/SHMIOP_Transport.cpp: + * tao/Strategies/SHMIOP_Transport.h: + + Reverting the change "Wed Dec 10 13:58:41 2003 Chris Cleeland + " for PMB fixes. The fixes were right, + but it slowed down the ORB considerably due to the two + allocations along the critical path. We will try to get these + fixes in later once we have a way to get around the + allocations. + Sun Dec 14 08:47:03 2003 Balachandran Natarajan * tests/Bug_1639_Regression/test.mpc: diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 7ada6c903ee..76aeeaee51e 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) : orb_core_ (orb_core) , message_state_ (orb_core, - this) + this) , out_stream_ (this->buffer_, sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ TAO_ENCAP_BYTE_ORDER, @@ -295,7 +295,6 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type ( TAO_GIOP_Message_State &msg_state) - const { // Convert to the right type of Pluggable Messaging message type. @@ -329,6 +328,264 @@ TAO_GIOP_Message_Base::message_type ( return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } +int +TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) +{ + + if (this->message_state_.parse_message_header (incoming) == -1) + { + return -1; + } + + return 0; +} + +ssize_t +TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) +{ + // Actual message size including the header.. + CORBA::ULong msg_size = + this->message_state_.message_size (); + + size_t len = incoming.length (); + + // If we have too many messages or if we have less than even a size + // of the GIOP header then .. + if (len > msg_size || + len < TAO_GIOP_MESSAGE_HEADER_LEN) + { + return -1; + } + else if (len == msg_size) + return 0; + + return msg_size - len; +} + + +int +TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd) +{ + TAO_GIOP_Message_State state (this->orb_core_, + this); + + if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + if (incoming.length () > 0) + { + // Make a node which has a message block of the size of + // MESSAGE_HEADER_LEN. + qd = + this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); + + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + qd->missing_data_ = -1; + } + return 0; + } + + if (state.parse_message_header (incoming) == -1) + { + return -1; + } + + size_t copying_len = state.message_size (); + + qd = this->make_queued_data (copying_len); + + if (copying_len > incoming.length ()) + { + qd->missing_data_ = + copying_len - incoming.length (); + + copying_len = incoming.length (); + } + + qd->msg_block_->copy (incoming.rd_ptr (), + copying_len); + + incoming.rd_ptr (copying_len); + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + qd->msg_type_ = this->message_type (state); + return 1; +} + +int +TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) +{ + // Look to see whether we had atleast parsed the GIOP header ... + if (qd->missing_data_ == -1) + { + // The data length that has been stuck in there during the last + // read .... + size_t len = + qd->msg_block_->length (); + + // We know that we would have space for + // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data + // from the into the message block in + qd->msg_block_->copy (incoming.rd_ptr (), + TAO_GIOP_MESSAGE_HEADER_LEN - len); + + // Move the rd_ptr () in the incoming message block.. + incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); + + TAO_GIOP_Message_State state (this->orb_core_, + this); + + // Parse the message header now... + if (state.parse_message_header (*qd->msg_block_) == -1) + return -1; + + // Now grow the message block so that we can copy the rest of + // the data... + if (qd->msg_block_->space () < state.message_size ()) + { + ACE_CDR::grow (qd->msg_block_, + state.message_size ()); + } + + // Copy the pay load.. + // Calculate the bytes that needs to be copied in the queue... + size_t copy_len = + state.payload_size (); + + // If the data that needs to be copied is more than that is + // available to us .. + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + else + { + qd->missing_data_ = 0; + } + + // ..now we are set to copy the right amount of data to the + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the of the .. + incoming.rd_ptr (copy_len); + + // Get the other details... + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + qd->msg_type_ = this->message_type (state); + } + else + { + // @@todo: Need to abstract this out to a seperate method... + size_t copy_len = qd->missing_data_; + + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + + // Copy the right amount of data in to the node... + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the of the .. + qd->msg_block_->rd_ptr (copy_len); + + } + + + return 0; +} + + +int +TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd) +{ + if (dqd->byte_order_ != sqd->byte_order_ + || dqd->major_version_ != sqd->major_version_ + || dqd->minor_version_ != sqd->minor_version_) + { + // Yes, print it out in all debug levels!. This is an error by + // CORBA 2.4 spec + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) incompatible fragments:") + ACE_TEXT ("different GIOP versions or byte order\n"))); + return -1; + } + + // Skip the header in the incoming message + sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + // If we have a fragment header skip the header length too.. + if (sqd->minor_version_ == 2 && + sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); + + // Get the length of the incoming message block.. + size_t incoming_length = + sqd->msg_block_->length (); + + // Increase the size of the destination message block if we need + // to. + ACE_Message_Block *mb = + dqd->msg_block_; + + // Check space before growing. + if (mb->space () < incoming_length) + { + ACE_CDR::grow (mb, + mb->length () + incoming_length); + } + + // Copy the data + dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), + incoming_length); + return 0; +} + +void +TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) +{ + // Get the message information + qd->byte_order_ = + this->message_state_.byte_order_; + qd->major_version_ = + this->message_state_.giop_version_.major; + qd->minor_version_ = + this->message_state_.giop_version_.minor; + + //qd->more_fragments_ = this->message_state_.more_fragments_; + + if (this->message_state_.more_fragments_) + qd->more_fragments_ = 1; + else + qd->more_fragments_ = 0; + + qd->msg_type_= + this->message_type (this->message_state_); + + // Reset the message_state + this->message_state_.reset (); +} + int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -507,13 +764,13 @@ TAO_GIOP_Message_Base::process_reply_message ( // Should be taken care by the state specific parsing retval = generator_parser->parse_reply (input_cdr, - params); + params); break; case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: retval = generator_parser->parse_locate_reply (input_cdr, - params); + params); break; default: retval = -1; @@ -641,9 +898,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, parse_error = parser->parse_request_header (request); - TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); - if (csm) - csm->process_service_context(request); + request.orb_core()->codeset_manager()->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -843,9 +1098,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, } TAO::ObjectKey tmp_key (locate_request.object_key ().length (), - locate_request.object_key ().length (), - locate_request.object_key ().get_buffer (), - 0); + locate_request.object_key ().length (), + locate_request.object_key ().get_buffer (), + 0); // Set it to an error state parse_error = 1; @@ -1069,7 +1324,6 @@ TAO_GIOP_Message_Base::set_state ( } -#if 0 // Server sends an "I'm shutting down now, any requests you've sent me // can be retried" message to the server. The message is prefab, for // simplicity. @@ -1165,7 +1419,7 @@ TAO_GIOP_Message_Base:: transport-> id ())); } -#endif + int TAO_GIOP_Message_Base::send_reply_exception ( @@ -1322,49 +1576,44 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) } -void -TAO_GIOP_Message_Base::set_queued_data_from_message_header ( - TAO_Queued_Data *qd, - const ACE_Message_Block &mb - ) const +TAO_Queued_Data * +TAO_GIOP_Message_Base::make_queued_data (size_t sz) { - // @@CJC: Try leaving out the declaration for this->message_state_ - // and see what pukes. I don't think we need it any more. - TAO_GIOP_Message_State state; - if (state.take_values_from_message_block (mb) == -1) - { - // what the heck do we do here?! - qd->current_state_ = TAO_Queued_Data::INVALID; - return; - } + // Get a node for the queue.. + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data ( + this->orb_core_->transport_message_buffer_allocator ()); - // It'd be nice to have an abstract base for GIOP_Message_State - // so that there could just be a line like: - // qd->take_values_from (state); - // Get the message information - qd->byte_order_ = state.byte_order (); - qd->major_version_ = state.giop_version ().major; - qd->minor_version_ = state.giop_version ().minor; - qd->more_fragments_ = state.more_fragments () ? 1 : 0; - qd->request_id_ = state.request_id_; - qd->msg_type_= message_type (state); - qd->missing_data_bytes_ = state.payload_size (); + // @@todo: We have a similar method in Transport.cpp. Need to see how + // we can factor them out.. + // Make a datablock for the size requested + something. The + // "something" is required because we are going to align the data + // block in the message block. During alignment we could loose some + // bytes. As we may not know how many bytes will be lost, we will + // allocate ACE_CDR::MAX_ALIGNMENT extra. + ACE_Data_Block *db = + this->orb_core_->create_input_cdr_data_block (sz + + ACE_CDR::MAX_ALIGNMENT); + + ACE_Allocator *alloc = + this->orb_core_->input_cdr_msgblock_allocator (); + + ACE_Message_Block mb (db, + 0, + alloc); + + ACE_Message_Block *new_mb = mb.duplicate (); + + ACE_CDR::mb_align (new_mb); + + qd->msg_block_ = new_mb; + + + return qd; } -int -TAO_GIOP_Message_Base::check_for_valid_header ( - const ACE_Message_Block &mb - ) const +size_t +TAO_GIOP_Message_Base::header_length (void) const { - // NOTE! We don't hardcode the length of the header b/c header_length should - // be eligible for inlining by pretty much any compiler, and it should return - // a constant. The rest of this method is hard-coded and hand-optimized because - // this method gets called A LOT. - if (mb.length () < this->header_length ()) - return -1; - - // Is finding that it's the right length and the magic bytes present - // enough to declare it a valid header? I think so... - register const char* h = mb.rd_ptr (); - return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0; + return TAO_GIOP_MESSAGE_HEADER_LEN; } diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 7bf61e340f7..461673bd359 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -94,37 +94,35 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); - /// Process the request message that we have received on the - /// connection - virtual int process_request_message (TAO_Transport *transport, - TAO_Queued_Data *qd); - - /*! - \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. + /// Parse the incoming messages.. + virtual int parse_incoming_messages (ACE_Message_Block &message_block); - Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to - see if they look like the beginning of a message. If \code mb does not - contain less than \code header_length() bytes, this method cannot make a - complete evaluation, and returns a commensurate value. + /// Calculate the amount of data that is missing in the + /// message block. + virtual ssize_t missing_data (ACE_Message_Block &message_block); - \return 1 \code header_length() bytes found, and constitute a valid header - \return 0 \code header_length() bytes found, and do not constitute a valid header - \return -1 not enough bytes available to make a determination of header validity + /* Extract the details of the next message from the + * through . Returns 1 if there are more messages and returns a + * 0 if there are no more messages in . */ - virtual int check_for_valid_header (const ACE_Message_Block &mb) const; + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd); - /*! - \brief Set fields in \param qd based on values derived from \param mb. + /// Check whether the node needs consolidation from + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming); - This function sets fields in \param qd based on values derived - from \param mb. It assumes that if the length of \param mb is - enough to hold a header, then the data in there can be trusted to - make sense. - */ - virtual void set_queued_data_from_message_header ( - TAO_Queued_Data *, - const ACE_Message_Block &mb) const; + /// Get the details of the message parsed through the . + virtual void get_message_data (TAO_Queued_Data *qd); + /// @@Bala:Docu?? + virtual int consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd); + + /// Process the request message that we have received on the + /// connection + virtual int process_request_message (TAO_Transport *transport, + TAO_Queued_Data *qd); /// Parse the reply message that we received and return the reply @@ -176,7 +174,7 @@ protected: /// TAO_PLUGGABLE_MESSAGE_REPLY, /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. - TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state) const; + TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state); private: @@ -199,12 +197,10 @@ private: /// Send error messages int send_error (TAO_Transport *transport); -#if 0 /// Close a connection, first sending GIOP::CloseConnection. void send_close_connection (const TAO_GIOP_Message_Version &version, TAO_Transport *transport, void *ctx); -#endif /// We must send a LocateReply through , this request /// resulted in some kind of exception. diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i index f5e39d9aa54..a589447a413 100644 --- a/TAO/tao/GIOP_Message_Base.i +++ b/TAO/tao/GIOP_Message_Base.i @@ -4,11 +4,3 @@ // // GIOP_Message_Base // - - -ACE_INLINE size_t -TAO_GIOP_Message_Base::header_length (void) const -{ - return TAO_GIOP_MESSAGE_HEADER_LEN; -} - diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl index 47e4730befb..18bb7936ffd 100644 --- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl +++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl @@ -5,24 +5,9 @@ TAO_GIOP_Message_Generator_Parser_Impl:: check_revision (CORBA::Octet incoming_major, CORBA::Octet incoming_minor) { - CORBA::UShort version_as_whole_num = incoming_major << 8 | incoming_minor; - CORBA::UShort max_allowable_version = TAO_DEF_GIOP_MAJOR << 8 | TAO_DEF_GIOP_MINOR; - - CORBA::Boolean ret = 0; - - // If it's greater than the max, we know it's not allowed. - if (version_as_whole_num > max_allowable_version) + if (incoming_major > TAO_DEF_GIOP_MAJOR || + incoming_minor > TAO_DEF_GIOP_MINOR) return 0; - - // If it's less than the max, though, we still have to check for - // each explicit version and only allow the ones we know work. - switch (version_as_whole_num) - { - case 0x0100: - case 0x0101: - case 0x0102: - ret = 1; - } - - return ret; + + return 1; } diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index 3ff0683729a..462266dd9bf 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -243,7 +243,6 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream) } -#if 0 int TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block) { @@ -502,7 +501,6 @@ TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/, // We dont know what fragments are??? return -1; } -#endif int TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, @@ -730,9 +728,7 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport, parse_error = this->parse_request_header (request); - TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); - if (csm) - csm->process_service_context(request); + request.orb_core()->codeset_manager()->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -1630,6 +1626,38 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label, } } +TAO_Queued_Data * +TAO_GIOP_Message_Lite::make_queued_data (size_t sz) +{ + // Get a node for the queue.. + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data (); + + // Make a datablock for the size requested + something. The + // "something" is required because we are going to align the data + // block in the message block. During alignment we could loose some + // bytes. As we may not know how many bytes will be lost, we will + // allocate ACE_CDR::MAX_ALIGNMENT extra. + ACE_Data_Block *db = + this->orb_core_->create_input_cdr_data_block (sz + + ACE_CDR::MAX_ALIGNMENT); + + ACE_Allocator *alloc = + this->orb_core_->input_cdr_msgblock_allocator (); + + ACE_Message_Block mb (db, + 0, + alloc); + + ACE_Message_Block *new_mb = mb.duplicate (); + + ACE_CDR::mb_align (new_mb); + + qd->msg_block_ = new_mb; + + return qd; +} + int TAO_GIOP_Message_Lite::generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, @@ -1651,22 +1679,3 @@ TAO_GIOP_Message_Lite::header_length (void) const { return TAO_GIOP_LITE_HEADER_LEN; } - -void -TAO_GIOP_Message_Lite::set_queued_data_from_message_header ( - TAO_Queued_Data *qd, - const ACE_Message_Block &mb - ) const -{ - ACE_UNUSED_ARG (qd); - ACE_UNUSED_ARG (mb); -} - -int -TAO_GIOP_Message_Lite::check_for_valid_header ( - const ACE_Message_Block &mb - ) const -{ - ACE_UNUSED_ARG (mb); - return 0; -} diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h index bfc7f5f01ad..b10ef17982c 100644 --- a/TAO/tao/GIOP_Message_Lite.h +++ b/TAO/tao/GIOP_Message_Lite.h @@ -88,11 +88,8 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); - /// Process the request message that we have received on the - /// connection - virtual int process_request_message (TAO_Transport *transport, - TAO_Queued_Data *qd); - + /// Parse the incoming messages.. + virtual int parse_incoming_messages (ACE_Message_Block &message_block); /// Get the message type. The return value would be one of the /// following: @@ -102,25 +99,33 @@ public: /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. TAO_Pluggable_Message_Type message_type (void); - /*! - \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. - Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to - see if they look like the beginning of a message. Does + /// Calculate the amount of data that is missing in the + /// message block. + virtual ssize_t missing_data (ACE_Message_Block &message_block); + + /* Extract the details of the next message from the + * through . Returns 1 if there are more messages and returns a + * 0 if there are no more messages in . */ - virtual int check_for_valid_header (const ACE_Message_Block &mb) const; + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd); - /*! - \brief Set fields in \param qd based on values derived from \param mb. + /// Check whether the node needs consolidation from + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming); - This function sets fields in \param qd based on values derived - from \param mb. It assumes that if the length of \param mb is - enough to hold a header, then the data in there can be trusted to - make sense. - */ - virtual void set_queued_data_from_message_header ( - TAO_Queued_Data *, - const ACE_Message_Block &mb) const; + /// Get the details of the message parsed through the . + virtual void get_message_data (TAO_Queued_Data *qd); + + /// @@Bala: Docu??? + virtual int consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd); + + /// Process the request message that we have received on the + /// connection + virtual int process_request_message (TAO_Transport *transport, + TAO_Queued_Data *qd); /// Parse the reply message that we received and return the reply /// information though diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index f9c7fbeda1a..e3a3ca20bf3 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -5,65 +5,20 @@ #include "tao/GIOP_Message_Base.h" #include "ace/Log_Msg.h" -#include "ace/OS_NS_string.h" #if !defined (__ACE_INLINE__) # include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ - -class TAO_Debug_Msg_Emitter_Guard -{ -public: - TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg) - : which_level_(debug_level) - { - if (TAO_debug_level < this->which_level_) - { - msg_ = 0; - return; - } - - this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ]; - ACE_OS::strcpy (this->msg_, msg); - ACE_OS::strcat (this->msg_, " begin\n"); - - if (TAO_debug_level >= this->which_level_) - { - ACE_DEBUG ((LM_DEBUG, this->msg_ )); - } - } - - ~TAO_Debug_Msg_Emitter_Guard () - { - if (this->msg_) - { - if (TAO_debug_level >= this->which_level_) - { - char* begin_start = - this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1; - ACE_OS::strcpy (begin_start, " end\n"); - ACE_DEBUG ((LM_DEBUG, this->msg_)); - } - - delete[] this->msg_; - } - } - -private: - static const int MAGIC_LENGTH; - unsigned int which_level_; - char* msg_; -}; - -const int TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH = 8; // " begin\n" + \000 - -ACE_RCSID(tao, GIOP_Message_State, "$Id$") +ACE_RCSID (tao, + GIOP_Message_State, + "$Id$") TAO_GIOP_Message_State::TAO_GIOP_Message_State ( TAO_ORB_Core * /*orb_core*/, - TAO_GIOP_Message_Base * /*base*/) - : giop_version_ (TAO_DEF_GIOP_MAJOR, + TAO_GIOP_Message_Base *base) + : base_ (base), + giop_version_ (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR), byte_order_ (0), message_type_ (0), @@ -74,79 +29,125 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State ( { } -// This doesn't check the message block's length, so that means that -// the *caller* needs to do that first. + int -TAO_GIOP_Message_State::take_values_from_message_block ( - const ACE_Message_Block& mb - ) +TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming) { - const char* buf = mb.rd_ptr (); + if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN) + { + // Parse the GIOP header + if (this->parse_message_header_i (incoming) == -1) + return -1; + } - // Get the version information - if (this->set_version_info_from_buffer (buf) == -1) + return 0; +} + +int +TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) +{ + if (TAO_debug_level > 8) { - return -1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_State::parse_message_header_i\n" + )); } - // Get the byte order information... - if (this->set_byte_order_info_from_buffer (buf) == -1) + // Grab the rd_ptr_ from the message block.. + char *buf = incoming.rd_ptr (); + + // Parse the magic bytes first + if (this->parse_magic_bytes (buf) == -1) { return -1; } + // Get the version information + if (this->get_version_info (buf) == -1) + return -1; + + // Get the byte order information... + if (this->get_byte_order_info (buf) == -1) + return -1; + // Get the message type this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - // Get the size of the message.. - this->set_payload_size_from_buffer (buf); - // Get the request id - this->parse_fragment_header (buf, mb.length ()); + // Get the size of the message.. + this->get_payload_size (buf); if (this->message_size_ == 0) { - const char* msgname = 0; - - switch (this->message_type_) - { - case TAO_GIOP_MESSAGERROR: - msgname = "GIOP_MESSAGE_ERROR"; break; - case TAO_GIOP_CLOSECONNECTION: - msgname = "GIOP_CLOSE_CONNECTION"; break; - } - if (msgname != 0) + if (this->message_type_ == TAO_GIOP_MESSAGERROR) { if (TAO_debug_level > 0) { - ACE_DEBUG (( - LM_DEBUG, - "(%P|%t) GIOP_Message_State::take_values: %s rcv'd.\n", - msgname - )); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) -" + "GIOP_MESSAGE_ERROR received \n")); } + return 0; } else { if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) GIOP_Message_State::take_values: " - "Message of size zero rcv'd.\n")); - } - + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - " + "Message of size zero recd. \n")); return -1; } } + + if (this->more_fragments_) + { + (void) this->parse_fragment_header (buf, + incoming.length ()); + } + return 0; } + + + +int +TAO_GIOP_Message_State::parse_magic_bytes (char *buf) +{ + // The values are hard-coded to support non-ASCII platforms. + if (!(buf [0] == 0x47 // 'G' + && buf [1] == 0x49 // 'I' + && buf [2] == 0x4f // 'O' + && buf [3] == 0x50)) // 'P' + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("TAO (%P|%t) - bad header, ") + ACE_LIB_TEXT ("magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"), + buf[0], + buf[1], + buf[2], + buf[3])); + return -1; + } + + return 0; +} + int -TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) +TAO_GIOP_Message_State::get_version_info (char *buf) { + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_State::get_version_info\n")); + } + // We have a GIOP message on hand. Get its revision numbers - CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; - CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + CORBA::Octet incoming_major = + buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = + buf[TAO_GIOP_VERSION_MINOR_OFFSET]; // Check the revision information if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( @@ -156,9 +157,7 @@ TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - ") - ACE_TEXT ("GIOP_Message_State::set_version_info_from_buffer:") - ACE_TEXT ("bad version <%d.%d>\n"), + ACE_TEXT ("TAO (%P|%t) - bad version <%d.%d>\n"), incoming_major, incoming_minor)); } @@ -173,9 +172,15 @@ TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) } int -TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) +TAO_GIOP_Message_State::get_byte_order_info (char *buf) { - // Let us be specific that this is for 1.0 + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info\n")); + } + + // Let us be specific that this is for 1.0 if (this->giop_version_.minor == 0 && this->giop_version_.major == 1) { @@ -186,14 +191,10 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) this->byte_order_ != 1) { if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::" - "get_byte_order_info, " - "invalid byte order <%d> for version <1.0>\n", - this->byte_order_)); - } - + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info, " + "invalid byte order <%d> for version <1.0>\n", + this->byte_order_)); return -1; } } @@ -210,15 +211,12 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) { if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - invalid flags for <%d>") - ACE_TEXT (" for version <%d %d> \n"), - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], - this->giop_version_.major, - this->giop_version_.minor)); - } - + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + this->giop_version_.major, + this->giop_version_.minor)); return -1; } } @@ -226,10 +224,19 @@ TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) return 0; } +void +TAO_GIOP_Message_State::get_payload_size (char *rd_ptr) +{ + // Move the read pointer + rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; + + this->message_size_ = this->read_ulong (rd_ptr); +} + int -TAO_GIOP_Message_State::parse_fragment_header (const char *buf, +TAO_GIOP_Message_State::parse_fragment_header (char *buf, size_t length) { size_t len = @@ -239,7 +246,9 @@ TAO_GIOP_Message_State::parse_fragment_header (const char *buf, // By this point we are doubly sure that we have a more or less // valid GIOP message with a valid major revision number. - if (this->giop_version_.minor >= 2 && length > len) + if (this->giop_version_.minor == 2 && + this->message_type_ == TAO_GIOP_FRAGMENT && + length > len) { // Fragmented message in GIOP 1.2 should have a fragment header // following the GIOP header. Grab the rd_ptr to get that @@ -254,7 +263,7 @@ TAO_GIOP_Message_State::parse_fragment_header (const char *buf, } CORBA::ULong -TAO_GIOP_Message_State::read_ulong (const char *rd_ptr) +TAO_GIOP_Message_State::read_ulong (char *rd_ptr) { CORBA::ULong x = 0; diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index 9ae3db82230..f902fa03a0e 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -41,10 +41,11 @@ class TAO_Export TAO_GIOP_Message_State public: /// Ctor - TAO_GIOP_Message_State (TAO_ORB_Core *orb_core = 0, - TAO_GIOP_Message_Base *base = 0); + TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, + TAO_GIOP_Message_Base *base); - int take_values_from_message_block (const ACE_Message_Block& mb); + /// Parse the message header. + int parse_message_header (ACE_Message_Block &incoming); /// Return the message size CORBA::ULong message_size (void) const; @@ -52,24 +53,9 @@ public: /// Return the message size CORBA::ULong payload_size (void) const; - /*! - \brief Return the byte order information. - \return 0 big-endian - \return 1 little-endian - */ + /// Return the byte order information CORBA::Octet byte_order (void) const; - /*! - \brief Return GIOP version information. - */ - const TAO_GIOP_Message_Version &giop_version () const; - - /// (Requests and Replys) - CORBA::Octet more_fragments () const; - - /// MsgType above - CORBA::Octet message_type () const; - /// Reset the state.. void reset (void); @@ -77,30 +63,40 @@ private: friend class TAO_GIOP_Message_Base; + /// Parse the message header. + int parse_message_header_i (ACE_Message_Block &incoming); + + /// Checks for the magic word 'GIOP' in the start of the incoing + /// stream + int parse_magic_bytes (char *buf); + /// Extracts the version information from the incoming /// stream. Performs a check for whether the version information is /// right and sets the information in the - int set_version_info_from_buffer (const char *buf); + int get_version_info (char *buf); /// Extracts the byte order information from the incoming /// stream. Performs a check for whether the byte order information /// right and sets the information in the - int set_byte_order_info_from_buffer (const char *buf); + int get_byte_order_info (char *buf); /// Gets the size of the payload and set the size in the - void set_payload_size_from_buffer (const char *buf); + void get_payload_size (char *buf); /// Parses the GIOP FRAGMENT_HEADER information from the incoming /// stream. - int parse_fragment_header (const char *buf, + int parse_fragment_header (char *buf, size_t length); /// Read the unsigned long from the buffer. The should just /// point to the next 4 bytes data that represent the ULong - CORBA::ULong read_ulong (const char *buf); + CORBA::ULong read_ulong (char *buf); private: + /// The GIOP base class.. + TAO_GIOP_Message_Base *base_; + // GIOP version information.. TAO_GIOP_Message_Version giop_version_; diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl index 80d421c7340..fe076bee689 100644 --- a/TAO/tao/GIOP_Message_State.inl +++ b/TAO/tao/GIOP_Message_State.inl @@ -33,29 +33,22 @@ TAO_GIOP_Message_State::reset (void) this->missing_data_ = 0; } -ACE_INLINE const TAO_GIOP_Message_Version & -TAO_GIOP_Message_State::giop_version () const +#if 0 +ACE_INLINE int +TAO_GIOP_Message_State::message_fragmented (void) { - return this->giop_version_; -} + if (this->more_fragments) + return 1; -ACE_INLINE CORBA::Octet -TAO_GIOP_Message_State::more_fragments () const -{ - return this->more_fragments_; + return 0; } -ACE_INLINE CORBA::Octet -TAO_GIOP_Message_State::message_type () const -{ - return this->message_type_; -} -ACE_INLINE void -TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr) -{ - // Move the read pointer - rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - this->message_size_ = this->read_ulong (rd_ptr); +ACE_INLINE CORBA::Boolean +TAO_GIOP_Message_State::header_received (void) const +{ + return this->message_size != 0; } + +#endif diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 25b16308ff1..1d802aa25c7 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -95,7 +95,7 @@ TAO_IIOP_Transport::recv (char *buf, { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv, ") + ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv_i, ") ACE_TEXT ("read failure - %m\n"), this->id ())); } diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index a510be13f4b..14970106b50 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -1,8 +1,7 @@ #include "Incoming_Message_Queue.h" -#include "Pluggable_Messaging.h" #include "debug.h" -#include "ace/Malloc_T.h" -#include "ace/Message_Block.h" + +#include "ace/Log_Msg.h" #if !defined (__ACE_INLINE__) # include "Incoming_Message_Queue.inl" @@ -13,7 +12,7 @@ ACE_RCSID (tao, "$Id$") TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core) - : last_added_ (0), + : queued_data_ (0), size_ (0), orb_core_ (orb_core) { @@ -43,21 +42,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) { // Check to see if the length of the incoming block is less than // that of the of the tail. - if (block.length () <= this->last_added_->missing_data_bytes_) + if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_) { n = block.length (); } else { - n = this->last_added_->missing_data_bytes_; + n = this->queued_data_->missing_data_; } // Do the copy - this->last_added_->msg_block_->copy (block.rd_ptr (), + this->queued_data_->msg_block_->copy (block.rd_ptr (), n); // Decerement the missing data - this->last_added_->missing_data_bytes_ -= n; + this->queued_data_->missing_data_ -= n; } return n; @@ -66,20 +65,17 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) TAO_Queued_Data * TAO_Incoming_Message_Queue::dequeue_head (void) { - if (this->size_ == 0) - return 0; - // Get the node on the head of the queue... - TAO_Queued_Data *head = this->last_added_->next_; + TAO_Queued_Data *tmp = + this->queued_data_->next_; // Reset the head node.. - this->last_added_->next_ = head->next_; - - // Decrease the size and reset last_added_ if empty - if (--this->size_ == 0) - this->last_added_ = 0; + this->queued_data_->next_ = tmp->next_; + + // Decrease the size + --this->size_; - return head; + return tmp; } TAO_Queued_Data * @@ -90,412 +86,95 @@ TAO_Incoming_Message_Queue::dequeue_tail (void) return 0; // Get the node on the head of the queue... - TAO_Queued_Data *head = - this->last_added_->next_; + TAO_Queued_Data *tmp = + this->queued_data_->next_; - while (head->next_ != this->last_added_) + while (tmp->next_ != this->queued_data_) { - head = head->next_; + tmp = tmp->next_; } // Put the head in tmp. - head->next_ = this->last_added_->next_; + tmp->next_ = this->queued_data_->next_; - TAO_Queued_Data *ret_qd = this->last_added_; + TAO_Queued_Data *ret_qd = this->queued_data_; - this->last_added_ = head; + this->queued_data_ = tmp; // Decrease the size - if (--this->size_ == 0) - this->last_added_ = 0; + --this->size_; return ret_qd; } + int TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) { if (this->size_ == 0) { - this->last_added_ = nd; - this->last_added_->next_ = this->last_added_; + this->queued_data_ = nd; + this->queued_data_->next_ = this->queued_data_; } else { - nd->next_ = this->last_added_->next_; - this->last_added_->next_ = nd; - this->last_added_ = nd; + nd->next_ = this->queued_data_->next_; + this->queued_data_->next_ = nd; + this->queued_data_ = nd; } ++ this->size_; return 0; } -TAO_Queued_Data * -TAO_Incoming_Message_Queue::find_fragment (CORBA::Octet major, - CORBA::Octet minor) const -{ - TAO_Queued_Data *found = 0; - if (this->last_added_ != 0) - { - TAO_Queued_Data *qd = this->last_added_->next_; - - do { - if (qd->more_fragments_ && - qd->major_version_ == major && qd->minor_version_ == minor) - { - found = qd; - } - else - { - qd = qd->next_; - } - } while (found == 0 && qd != this->last_added_->next_); - } - - return found; -} - -TAO_Queued_Data * -TAO_Incoming_Message_Queue::find_fragment (CORBA::ULong request_id) const -{ - TAO_Queued_Data *found = 0; - if (this->last_added_ != 0) - { - TAO_Queued_Data *qd = this->last_added_->next_; - - do { - if (qd->more_fragments_ && qd->request_id_ == request_id) - { - found = qd; - } - else - { - qd = qd->next_; - } - } while (found == 0 && qd != this->last_added_->next_); - } - - return found; -} - /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc) - : msg_block_ (0) - , current_state_ (INVALID) - , missing_data_bytes_ (0) - , byte_order_ (0) - , major_version_ (0) - , minor_version_ (0) - , more_fragments_ (0) - , request_id_ (0) - , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - , next_ (0) - , allocator_ (alloc) + : msg_block_ (0), + missing_data_ (0), + byte_order_ (0), + major_version_ (0), + minor_version_ (0), + more_fragments_ (0), + msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), + next_ (0), + allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc) - : msg_block_ (mb) - , current_state_ (INVALID) - , missing_data_bytes_ (0) - , byte_order_ (0) - , major_version_ (0) - , minor_version_ (0) - , more_fragments_ (0) - , request_id_ (0) - , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - , next_ (0) - , allocator_ (alloc) + : msg_block_ (mb), + missing_data_ (0), + byte_order_ (0), + major_version_ (0), + minor_version_ (0), + more_fragments_ (0), + msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), + next_ (0), + allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd) - : msg_block_ (qd.msg_block_->duplicate ()) - , current_state_ (qd.current_state_) - , missing_data_bytes_ (qd.missing_data_bytes_) - , byte_order_ (qd.byte_order_) - , major_version_ (qd.major_version_) - , minor_version_ (qd.minor_version_) - , more_fragments_ (qd.more_fragments_) - , request_id_ (qd.request_id_) - , msg_type_ (qd.msg_type_) - , next_ (0) - , allocator_ (qd.allocator_) + : msg_block_ (qd.msg_block_->duplicate ()), + missing_data_ (qd.missing_data_), + byte_order_ (qd.byte_order_), + major_version_ (qd.major_version_), + minor_version_ (qd.minor_version_), + more_fragments_ (qd.more_fragments_), + msg_type_ (qd.msg_type_), + next_ (0), + allocator_ (qd.allocator_) { } - -/*! - \brief Allocate and return a new empty message block of size \a new_size mimicking parameters of \a mb. - - This function allocates a new aligned message block using the same - allocators and flags as found in \a mb. The size of the new message - block is at least \a new_size; the size may be adjusted up in order - to accomodate alignment requirements and still fit \a new_size bytes - into the aligned buffer. - - \param mb message block whose parameters should be mimicked - \param new_size size of the new message block (will be adjusted for proper alignment) - \return an aligned message block with rd_ptr sitting at correct alignment spot, 0 on failure - - \author Thanks to Rich Seibel for helping implement with the public API for ACE_Message_Block! - */ -static ACE_Message_Block* -clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size) -{ - // Calculate the required size of the cloned block with alignment - size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT); - - // Get the allocators - ACE_Allocator *data_allocator; - ACE_Allocator *data_block_allocator; - ACE_Allocator *message_block_allocator; - mb->access_allocators (data_allocator, - data_block_allocator, - message_block_allocator); - - // Create a new Message Block - ACE_Message_Block *nb; - ACE_NEW_MALLOC_RETURN (nb, - ACE_static_cast(ACE_Message_Block*, - message_block_allocator->malloc ( - sizeof (ACE_Message_Block))), - ACE_Message_Block(aligned_size, - mb->msg_type(), - mb->cont(), - 0, //we want the data block created - data_allocator, - mb->locking_strategy(), - mb->msg_priority(), - mb->msg_execution_time (), - mb->msg_deadline_time (), - data_block_allocator, - message_block_allocator), - 0); - - ACE_CDR::mb_align (nb); - - // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since - // we just dynamically allocated the two things. - nb->set_flags (mb->flags()); - nb->clr_flags (ACE_Message_Block::DONT_DELETE); - - return nb; -} - -/*! - \brief Copy data from \a src->rd_ptr to \a dst->wr_ptr, of at most \a span_size bytes. - - (This is similar to memcpy, although with message blocks we can be a - little smarter.) This function assumes that \a dst has enough space - for \a span_size bytes, and that \a src has at least \a span_size - bytes available to copy. When everything is copied \a dst->wr_ptr - gets updated accordingly, but \a src->rd_ptr is left to the caller - to update. - - \param dst the destination message block - \param src the source message block - \param span_size size of the maximum span of bytes to be copied - \return 0 on failure, otherwise \a dst - */ -static ACE_Message_Block* -copy_mb_span (ACE_Message_Block *dst, ACE_Message_Block *src, size_t span_size) -{ - // @todo check for enough space in dst, and src contains at least span_size - - if (src == 0 || dst == 0) - return 0; - - if (span_size == 0) - return dst; - - dst->copy (src->rd_ptr (), span_size); - return dst; -} - /*static*/ TAO_Queued_Data * -TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, - TAO_Pluggable_Messaging &msging_obj, - ACE_Allocator *alloc) -{ - register TAO_Queued_Data *new_qd = 0; - register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */ - register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */ - - // Validate arguments. - if (mb == 0) - goto failure; - - new_qd = make_queued_data (alloc); - if (new_qd == 0) - goto failure; - - // do we have enough bytes to make a complete header? - if (MB_LEN >= HDR_LEN) - { - // Since we have enough bytes to make a complete header, - // the header needs to be valid. Check that now, and punt - // if it's not valid. - if (! msging_obj.check_for_valid_header (*mb)) - { - goto failure; - } - else - { - new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; - msging_obj.set_queued_data_from_message_header (new_qd, *mb); - if (new_qd->current_state_ == INVALID) - goto failure; - - // missing_data_bytes_ now has the full GIOP message size, so we allocate - // a new message block of that size, plus the header. - new_qd->msg_block_ = clone_mb_nocopy_size (mb, - new_qd->missing_data_bytes_ + - HDR_LEN); - // Of course, we don't have the whole message (if we did, we - // wouldn't be here!), so we copy only what we've got, i.e., whatever's - // in the message block. - if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) - goto failure; - - // missing_data_bytes_ now has the full GIOP message size, but - // there might still be stuff in mb. Therefore, we have to adjust - // missing_data_bytes_, i.e., decrease it by the number of "actual - // payload bytes" in mb. - // - // "actual payload bytes" :== length of mb (which included the header) - header length - new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN); - mb->rd_ptr (MB_LEN); - } - } - else - { - new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER; - new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN); - if (new_qd->msg_block_ == 0 || - copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) - goto failure; - new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN; - mb->rd_ptr (MB_LEN); - } - - ACE_ASSERT (new_qd->current_state_ != INVALID); - if (TAO_debug_level > 7) - { - const char* s = "?unk?"; - switch (new_qd->current_state_) - { - case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break; - case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break; - case INVALID: s = "INVALID"; break; - case COMPLETED: s = "COMPLETED"; break; - } - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") - ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:") - ACE_TEXT ("state=%s,missing_data_bytes=%u\n"), - new_qd->msg_block_->length(), new_qd, s, new_qd->missing_data_bytes_)); - } - return new_qd; - -failure: - if (TAO_debug_level > 7) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") - ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"), - mb, new_qd)); - } - TAO_Queued_Data::release (new_qd); - return 0; -} - - -/*static*/ -TAO_Queued_Data * -TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb, - TAO_Pluggable_Messaging &msging_obj, - ACE_Allocator *alloc) -{ - register const size_t HDR_LEN = msging_obj.header_length (); - register const size_t MB_LEN = mb.length (); - - // Validate arguments. - if (MB_LEN < HDR_LEN) - return 0; - - size_t total_msg_len = 0; - register TAO_Queued_Data *new_qd = make_queued_data (alloc); - if (new_qd == 0) - goto failure; - - // We can assume that there are enough bytes for a header, so - // extract the header data. Don't assume that there's enough for - // the payload just yet. - new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; - msging_obj.set_queued_data_from_message_header (new_qd, mb); - if (new_qd->current_state_ == INVALID) - goto failure; - - // new_qd_->missing_data_bytes_ + protocol header length should be - // *at least* the length of the message. Verify that we have that - // many bytes in the message block and, if we don't, release the new - // qd and fail. - total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN; - if (total_msg_len > MB_LEN) - goto failure; - - // Make a copy of the relevant portion of mb and hang on to it - if ((new_qd->msg_block_ = clone_mb_nocopy_size (&mb, total_msg_len)) == 0) - goto failure; - - if (copy_mb_span (new_qd->msg_block_, &mb, total_msg_len) == 0) - goto failure; - - // Update missing data and the current state - new_qd->missing_data_bytes_ = 0; - new_qd->current_state_ = COMPLETED; - - // Advance the rd_ptr on the message block - mb.rd_ptr (total_msg_len); - - if (TAO_debug_level > 7) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") - ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"), - total_msg_len, &mb, new_qd)); - } - - return new_qd; - -failure: - if (TAO_debug_level > 7) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") - ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"), - &mb, MB_LEN)); - if (TAO_debug_level >= 10) - ACE_HEX_DUMP ((LM_DEBUG, - mb.rd_ptr (), MB_LEN, - ACE_TEXT (" residual bytes in buffer"))); - - } - TAO_Queued_Data::release (new_qd); - return 0; -} - -/*static*/ -TAO_Queued_Data * -TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc) +TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc) { TAO_Queued_Data *qd = 0; @@ -602,29 +281,3 @@ TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd) return qd; } - -void -TAO_Queued_Data::consolidate (void) -{ - // Is this a chain of fragments? - if (this->more_fragments_ && this->msg_block_->cont () != 0) - { - // Create a message block big enough to hold the entire chain - ACE_Message_Block *dest = clone_mb_nocopy_size ( - this->msg_block_, - this->msg_block_->total_length ()); - // Reset the cont() parameter - dest->cont (0); - - // Use ACE_CDR to consolidate the chain for us - ACE_CDR::consolidate (dest, this->msg_block_); - - // free the original message block chain - this->msg_block_->release (); - - // Set the message block to the new consolidated message block - this->msg_block_ = dest; - this->more_fragments_ = 0; - } -} - diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index 660a207090b..3adfc3d24ac 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -27,7 +27,6 @@ class ACE_Allocator; class TAO_ORB_Core; class TAO_Queued_Data; class TAO_Transport; -class TAO_Pluggable_Messaging; /** * @class TAO_Incoming_Message_Queue @@ -76,68 +75,31 @@ public: /// Return the length of the queue.. CORBA::ULong queue_length (void); - /*! - @name Node Inspection Predicates - - \brief These methods allow inspection of head and tail nodes for "completeness". - - These methods check to see whether the node on the head or tail is - "complete" and ready for further processing. See each method's - documentation for its definition of "complete". - */ - //@{ - /*! - "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though) - - \return -1 queue is empty - \return 0 tail is not "complete" - \return 1 tail is "complete" - */ + /// Methods for sanity check. Checks to see whether the node on the + /// head or tail is complete or not and ready for further + /// processing. int is_tail_complete (void); - - /*! - - "complete" == the GIOP message at the head is not missing any data - AND, if it's the first message in a series of GIOP fragments, all - the fragments have been received, parsed, and placed into the - queue - - \return -1 if queue is empty - \return 0 if head is not "complete" - \return 1 if head is "complete" - */ int is_head_complete (void); - //@} - /*! - \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment. - */ + /// This method checks whether the last message that was queued up + /// was fragmented... int is_tail_fragmented (void); /// Return the size of data that is missing in tail of the queue. size_t missing_data_tail (void) const; /// void missing_data (size_t data); - /// Find the first fragment that matches the GIOP version - TAO_Queued_Data *find_fragment (CORBA::Octet major, - CORBA::Octet minor) const; - - /// Find the first fragment that matches the request id - TAO_Queued_Data *find_fragment (CORBA::ULong request_id) const; - private: friend class TAO_Transport; + /// Make a node for the queue. + TAO_Queued_Data *get_node (void); + private: - /*! - \brief A circular linked list of messages awaiting processing. - \a last_message_added_ points to the most recent message added to - the list. The earliest addition can be easily accessed via - \a last_message_added_->next_. - */ - TAO_Queued_Data *last_added_; + /// A linked listof messages that await processing + TAO_Queued_Data *queued_data_; /// The size of the queue CORBA::ULong size_; @@ -161,73 +123,20 @@ private: class TAO_Export TAO_Queued_Data { -protected: +public: /// Default Constructor TAO_Queued_Data (ACE_Allocator *alloc = 0); /// Constructor. TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0); -public: /// Copy constructor. TAO_Queued_Data (const TAO_Queued_Data &qd); - /*! - \name Factory Methods - - These methods manufacture instances of TAO_Queued_Data and return - them. These instances should be removed via TAO_Queued_Data::release. - - Instances are initialized from data in the ACE_Message_Block, - interpreted according to rules defined in the - TAO_Pluggable_Messaging object. - - The manufactured instance adopts the message block \em without - duplicating it; therefore, the caller must duplicate or orphan the - message block. The caller also must insure that the message block - can be released via ACE_Message_Block::release, and that its life - endures beyond the calling scope. - - For the purposes of TAO_Queued_Data, a completed message is a - completely received message as defined by the messaging protocol - object. For GIOP, that means that the number of bytes specified - in the general GIOP header have been completely received. It - specifically DOES NOT mean that all \em fragments have been - received. Fragment reassembly is another matter altogether. - */ - //@{ - /*! - \brief Make and return an instance of TAO_Queued_Data suitable for use as an uncompleted message. - */ - static TAO_Queued_Data* make_uncompleted_message (ACE_Message_Block *mb, - TAO_Pluggable_Messaging &msging_obj, - ACE_Allocator *alloc = 0); - /*! - \brief Make and return an instance of TAO_Queued_Data suitable for use as a completed message. - */ - // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE! - // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN - // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS - // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT - // BEHAVES DIFFERENTLY FROM make_uncompleted_message. - static TAO_Queued_Data* make_completed_message (ACE_Message_Block &mb, - TAO_Pluggable_Messaging &msging_obj, - ACE_Allocator *alloc = 0); - - /// Consolidate this fragments chained message blocks into one. - void consolidate (void); - - /*! - \brief Creation and deletion of a node in the queue. - \todo Maybe this should be private? - */ -private: - static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0); -public: - //@} + /// Creation and deletion of a node in the queue. + static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0); static void release (TAO_Queued_Data *qd); - void release (void); /// Duplicate ourselves. This creates a copy of ourselves on the /// heap and returns a pointer to the duplicated node. @@ -237,43 +146,11 @@ public: /// The message block that contains the message. ACE_Message_Block *msg_block_; - /*! - @name Missing Data details - - The \a missing_data_bytes_ member contains the number of bytes of - data missing from \a msg_block_. However, there can be two places - where data is missing: header and payload. We cannot know how - much data is missing from the payload until we have a complete - header. Fortunately, headers are a fixed length, so we can know - how much we're missing from the header. - - We use \param current_state_ to indicate which portion of the message - \param missing_data_bytes_ refers to, as well as the general state of - the message. - */ - //@{ - /*! - Describes the meaning given to the number stored in \a missing_data_bytes_. - */ - enum Queued_Data_State - { - INVALID = -1, //!< The queued data is in an invalid/uninitialized state, and no data should be trusted. - COMPLETED = 0, //!< Message is complete; \a missing_data_bytes_ should be zero. - WAITING_TO_COMPLETE_HEADER, //!< Value in \a missing_data_bytes_ indicates part of header is missing. - WAITING_TO_COMPLETE_PAYLOAD //!< Value in \a missing_data_bytes_ indicates part of payload is missing. - }; - - /*! - Indicates the current state of the message, including hints at - how to interpret the value stored in \a missing_data_bytes_. - */ - Queued_Data_State current_state_; - - /*! Data missing in the above message that hasn't been read or processed yet. */ - size_t missing_data_bytes_; - //@} - - /*! The byte order of the message that is stored in the node. */ + /// Data missing in the above message that hasn't been read or + /// processed yet. + CORBA::Long missing_data_; + + /// The byte order of the message that is stored in the node.. CORBA::Octet byte_order_; /// Many protocols like GIOP have a major and minor version @@ -288,9 +165,6 @@ public: /// queue already has more fragments that is missing.. CORBA::Octet more_fragments_; - /// The fragment request id - CORBA::ULong request_id_; - /// The message type of the message TAO_Pluggable_Message_Type msg_type_; diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index f82267b5cea..d67bd485383 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void) return -1; if (this->size_ && - this->last_added_->missing_data_bytes_ == 0) + this->queued_data_->missing_data_ == 0) return 1; return 0; @@ -31,8 +31,8 @@ TAO_Incoming_Message_Queue::is_head_complete (void) return -1; if (this->size_ && - this->last_added_->next_->missing_data_bytes_ == 0 && - this->last_added_->next_->more_fragments_ == 0) + this->queued_data_->next_->missing_data_ == 0 && + this->queued_data_->next_->more_fragments_ == 0) return 1; return 0; @@ -45,7 +45,7 @@ TAO_Incoming_Message_Queue::is_tail_fragmented (void) return 0; if (this->size_ && - this->last_added_->more_fragments_ == 1) + this->queued_data_->more_fragments_ == 1) return 1; return 0; @@ -55,22 +55,23 @@ ACE_INLINE size_t TAO_Incoming_Message_Queue::missing_data_tail (void) const { if (this->size_ != 0) - return this->last_added_->missing_data_bytes_; + return this->queued_data_->missing_data_; return 0; } -/************************************************************************/ -// Methods for TAO_Queued_Data -/************************************************************************/ -ACE_INLINE void -TAO_Queued_Data::release (void) +ACE_INLINE TAO_Queued_Data * +TAO_Incoming_Message_Queue::get_node (void) { - TAO_Queued_Data::release (this); + return TAO_Queued_Data::get_queued_data (); } +/************************************************************************/ +// Methods for TAO_Queued_Data +/************************************************************************/ + /*static*/ ACE_INLINE void TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb) diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 3e84635d767..8ac38de01e0 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -121,30 +121,36 @@ public: virtual void init (CORBA::Octet major, CORBA::Octet minor) = 0; + /// Parse the incoming messages.. + virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; + + /// Calculate the amount of data that is missing in the + /// message block. + virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; + + /// Get the details of the message parsed through the . + virtual void get_message_data (TAO_Queued_Data *qd) = 0; + + /* Extract the details of the next message from the + * through . Returns 1 if there are more messages and returns a + * 0 if there are no more messages in . + */ + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd) = 0; + + /// Check whether the node needs consolidation from + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) = 0; + + /// @@Bala:Docu?? + virtual int consolidate_fragments (TAO_Queued_Data *dqd, + const TAO_Queued_Data *sqd) = 0; + /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) = 0; - /*! - \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. - - Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to - see if they look like the beginning of a message. Does - */ - virtual int check_for_valid_header (const ACE_Message_Block &mb) const = 0; - - /*! - \brief Set fields in \param qd based on values derived from \param mb. - - This function sets fields in \param qd based on values derived - from \param mb. It assumes that if the length of \param mb is - enough to hold a header, then the data in there can be trusted to - make sense. - */ - virtual void set_queued_data_from_message_header ( - TAO_Queued_Data *, - const ACE_Message_Block &mb) const = 0; /// Parse the reply message that we received and return the reply /// information though diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index aa5a569892b..d4edd4c8c80 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -88,19 +88,12 @@ TAO_DIOP_Transport::send (iovec *iov, int iovcnt, for (int i = 0; i < iovcnt; i++) bytes_to_send += iov[i].iov_len; - ssize_t n = this->connection_handler_->dgram ().send (iov, - iovcnt, - addr); + this->connection_handler_->dgram ().send (iov, + iovcnt, + addr); // @@ Michael: // Always return a positive number of bytes sent, as we do // not handle sending errors in DIOP. - if (n == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) Send of %d bytes failed %p\n"), - bytes_to_send, - ACE_TEXT ("send_i ()\n"))); - } bytes_transferred = bytes_to_send; @@ -198,7 +191,7 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh, // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.wr_ptr (), + ssize_t n = this->recv (message_block.rd_ptr (), message_block.space (), max_wait_time); @@ -206,7 +199,6 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh, if (n <= 0) { if (n == -1) - // @@ Why not send_connection_closed_notifications() ? this->tms_->connection_closed (); return n; @@ -215,43 +207,23 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh, // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Check the incoming message for validity. The check needs to be + // Parse the incoming message for validity. The check needs to be // performed by the messaging objects. - if (this->messaging_object ()->check_for_valid_header (message_block) == 0) - { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) failed to find a valid header on transport %d after fault %p\n"), - this->id (), - ACE_TEXT ("handle_input_i ()\n"))); - } - - return -1; - } + if (this->parse_incoming_messages (message_block) == -1) + return -1; // NOTE: We are not performing any queueing nor any checking for - // missing data. We are assuming that ALL the data arrives in a + // missing data. We are assuming that ALL the data would be got in a // single read. // Make a node of the message block.. - // - // We could make this more efficient by having a fixed Queued Data - // allocator, i.e., it always gave back the same thing. Actually, - // we *could* create an allocator that took a stack-allocated object - // as an argument and returned that when asked an allocation is - // done. Something to contemplate... - TAO_Queued_Data* qd = - TAO_Queued_Data::make_completed_message (message_block, - *this->messaging_object ()); - int retval = -1; - if (qd) - { - // Process the message - retval = this->process_parsed_messages (qd, rh); - TAO_Queued_Data::release (qd); - } - return retval; + TAO_Queued_Data qd (&message_block); + + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); + + // Process the message + return this->process_parsed_messages (&qd, rh); } @@ -338,175 +310,4 @@ TAO_DIOP_Transport::messaging_init (CORBA::Octet major, return 1; } -// @@ Frank: Hopefully DIOP doesn't need this -/* -int -TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) -{ - CORBA::Boolean byte_order; - if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0) - return -1; - - cdr.reset_byte_order (ACE_static_cast(int,byte_order)); - - DIOP::ListenPointList listen_list; - if ((cdr >> listen_list) == 0) - return -1; - - // As we have received a bidirectional information, set the flag to - // 1 - this->bidirectional_flag (1); - return this->connection_handler_->process_listen_point_list (listen_list); -} -*/ - - - -// @@ Frank: Hopefully DIOP doesn't need this -/* -void -TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) -{ - - // Get a handle on to the acceptor registry - TAO_Acceptor_Registry * ar = - this->orb_core ()->acceptor_registry (); - - - // Get the first acceptor in the registry - TAO_AcceptorSetIterator acceptor = ar->begin (); - - DIOP::ListenPointList listen_point_list; - - for (; - acceptor != ar->end (); - acceptor++) - { - // Check whether it is a DIOP acceptor - if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE) - { - this->get_listen_point (listen_point_list, - *acceptor); - } - } - - // We have the ListenPointList at this point. Create a output CDR - // stream at this point - TAO_OutputCDR cdr; - - // Marshall the information into the stream - if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0) - || (cdr << listen_point_list) == 0) - return; - - // Add this info in to the svc_list - opdetails.service_context ().set_context (IOP::BI_DIR_DIOP, - cdr); - - return; -} - - -int -TAO_DIOP_Transport::get_listen_point ( - DIOP::ListenPointList &listen_point_list, - TAO_Acceptor *acceptor) -{ - TAO_DIOP_Acceptor *iiop_acceptor = - ACE_dynamic_cast (TAO_DIOP_Acceptor *, - acceptor ); - - // Get the array of endpoints serviced by - const ACE_INET_Addr *endpoint_addr = - iiop_acceptor->endpoints (); - - // Get the count - size_t count = - iiop_acceptor->endpoint_count (); - - // Get the local address of the connection - ACE_INET_Addr local_addr; - - if (this->connection_handler_->peer ().get_local_addr (local_addr) - == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t) Could not resolve local host") - ACE_TEXT (" address in set_bidir_context_info () \n")), - -1); - } - - - // Note: Looks like there is no point in sending the list of - // endpoints on interfaces on which this connection has not - // been established. If this is wrong, please correct me. - char *local_interface = 0; - - // Get the hostname for the local address - if (iiop_acceptor->hostname (this->orb_core_, - local_addr, - local_interface) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t) Could not resolve local host") - ACE_TEXT (" name \n")), - -1); - } - - ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *, - endpoint_addr); - - for (size_t index = 0; - index <= count; - index++) - { - // Get the listen point on that acceptor if it has the same - // interface on which this connection is established - char *acceptor_interface = 0; - - if (iiop_acceptor->hostname (this->orb_core_, - tmp_addr[index], - acceptor_interface) == -1) - continue; - - // @@ This is very bad for performance, but it is a one time - // affair - if (ACE_OS::strcmp (local_interface, - acceptor_interface) == 0) - { - // We have the connection and the acceptor endpoint on the - // same interface - DIOP::ListenPoint point; - point.host = CORBA::string_dup (local_interface); - point.port = endpoint_addr[index].get_port_number (); - - // Get the count of the number of elements - CORBA::ULong len = listen_point_list.length (); - - // Increase the length by 1 - listen_point_list.length (len + 1); - - // Add the new length to the list - listen_point_list[len] = point; - } - - // @@ This is bad.... - CORBA::string_free (acceptor_interface); - } - - CORBA::string_free (local_interface); - return 1; -} -*/ - -#if 0 -TAO_Connection_Handler * -TAO_DIOP_Transport::invalidate_event_handler_i (void) -{ - TAO_Connection_Handler * eh = this->connection_handler_; - this->connection_handler_ = 0; - return eh; -} -#endif - #endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */ diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index c7f845eee81..aaebc6860cf 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -136,7 +136,6 @@ TAO_SHMIOP_Transport::recv (char *buf, } -#if 0 int TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, ssize_t missing_data, @@ -192,7 +191,6 @@ TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, // process that return this->process_parsed_messages (&pqd, rh); } -#endif int TAO_SHMIOP_Transport::send_request (TAO_Stub *stub, diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h index 8b54426aaa7..02c67c63116 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.h +++ b/TAO/tao/Strategies/SHMIOP_Transport.h @@ -82,14 +82,10 @@ protected: size_t len, const ACE_Time_Value *s = 0); -#if 0 - // This no longer exists with the PMB-change flow. Not sure how to deal with that, - // so for now we ditch the method and see if things work. virtual int consolidate_message (ACE_Message_Block &incoming, ssize_t missing_data, TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time); -#endif //@} diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 1640644edf9..9923ff1f895 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -19,9 +19,7 @@ #include "Resume_Handle.h" #include "Codeset_Manager.h" #include "Codeset_Translator_Factory.h" -#include "GIOP_Message_State.h" #include "ace/OS_NS_sys_time.h" -#include "ace/Message_Block.h" #include "ace/Reactor.h" @@ -112,15 +110,12 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , head_ (0) , tail_ (0) , incoming_message_queue_ (orb_core) - , uncompleted_message_ (0) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) , id_ ((size_t) this) , purging_order_ (0) - , recv_buffer_size_ (0) - , sent_byte_count_ (0) , char_translator_ (0) , wchar_translator_ (0) , tcs_set_ (0) @@ -241,9 +236,13 @@ TAO_Transport::generate_request_header ( { // codeset service context is only supposed to be sent in the first request // on a particular connection. - TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager(); - if (csm && this->first_request_) - csm->generate_service_context( opdetails, *this ); + if (this->first_request_) + { + this->orb_core ()->codeset_manager ()->generate_service_context ( + opdetails, + *this + ); + } if (this->messaging_object ()->generate_request_header (opdetails, spec, @@ -605,12 +604,7 @@ int TAO_Transport::schedule_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; if (TAO_debug_level > 3) { @@ -626,12 +620,7 @@ int TAO_Transport::cancel_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; if (TAO_debug_level > 3) { @@ -750,9 +739,6 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) // no bytes are sent send() can only return 0 or -1 ACE_ASSERT (byte_count != 0); - // Total no. of bytes sent for a send call - this->sent_byte_count_ += byte_count; - if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, @@ -776,10 +762,6 @@ TAO_Transport::drain_queue_i (void) // We loop over all the elements in the queue ... TAO_Queued_Message *i = this->head_; - // reset the value so that the counting is done for each new send - // call. - this->sent_byte_count_ = 0; - while (i != 0) { // ... each element fills the iovector ... @@ -838,14 +820,8 @@ TAO_Transport::drain_queue_i (void) if (this->flush_timer_pending ()) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - } + ACE_Reactor *reactor = eh->reactor (); + reactor->cancel_timer (this->flush_timer_id_); this->reset_flush_timer (); } @@ -922,25 +898,20 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, if (set_timer) { ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - this->current_deadline_ = new_deadline; - ACE_Time_Value delay = - new_deadline - ACE_OS::gettimeofday (); + ACE_Reactor *reactor = eh->reactor (); + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); - if (this->flush_timer_pending ()) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - this->flush_timer_id_ = - reactor->schedule_timer (&this->transport_timer_, - &this->current_deadline_, - delay); - } + if (this->flush_timer_pending ()) + { + reactor->cancel_timer (this->flush_timer_id_); } + + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); } return constraints_reached; @@ -1147,18 +1118,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, return 0; } - -class CTHack -{ -public: - CTHack() { enter(); } - ~CTHack() { leave(); } -private: - void enter() { x = 1; } - void leave() { x = 0; } - int x; -}; - /* * * All the methods relevant to the incoming data path of the ORB are @@ -1170,8 +1129,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Time_Value * max_wait_time, int /*block*/) { - CTHack cthack; - if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, @@ -1179,8 +1136,9 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, this->id ())); } - // First try to process messages off the head of the incoming queue. + // First try to process messages of the head of the incoming queue. int retval = this->process_queue_head (rh); + if (retval <= 0) { if (retval == -1) @@ -1191,6 +1149,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, "error while parsing the head of the queue\n", this->id())); } + return retval; } @@ -1199,7 +1158,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // The buffer on the stack which will be used to hold the input // messages - char buf[TAO_MAXBUFSIZE]; + char buf [TAO_MAXBUFSIZE]; #if defined (ACE_HAS_PURIFY) (void) ACE_OS::memset (buf, @@ -1221,35 +1180,26 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_msgblock_allocator ()); - // We'll loop trying to complete the message this number of times, - // and that's it. - unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS; - - unsigned int did_queue_message = 0; // Align the message block ACE_CDR::mb_align (&message_block); size_t recv_size = 0; + if (this->orb_core_->orb_params ()->single_read_optimization ()) { - recv_size = message_block.space (); + recv_size = + message_block.space (); } else { - recv_size = this->messaging_object ()->header_length (); + recv_size = + this->messaging_object ()->header_length (); } - // Saving the size of the received buffer in case any one needs to - // get the size of the message thats received in the - // context. Obviously the value will be changed for each recv call - // and the user is supposed to invoke the accessor only in the - // invocation context to get meaningful information. - this->recv_buffer_size_ = recv_size; - // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.wr_ptr (), + ssize_t n = this->recv (message_block.rd_ptr (), recv_size, max_wait_time); @@ -1262,7 +1212,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i: " + "TAO (%P|%t) - Transport[%d]::handle_input_i, " "read %d bytes\n", this->id (), n)); } @@ -1270,372 +1220,172 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // Set the write pointer in the stack buffer message_block.wr_ptr (n); - if (TAO_debug_level >= 10) - ACE_HEX_DUMP ((LM_DEBUG, - (const char *) message_block.rd_ptr (), - message_block.length (), - ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket"))); - + // Parse the message and try consolidating the message if + // needed. + retval = this->parse_consolidate_messages (message_block, + rh, + max_wait_time); -complete_message_and_possibly_enqueue: - // Check to see if we're still working to complete a message - if (this->uncompleted_message_) + if (retval <= 0) { - // try to complete it + if (retval == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "error while parsing and consolidating\n", + this->id ())); + } + return retval; + } - // on exit from this frame we have one of the following states: - // - // (a) an uncompleted message still in uncompleted_message_ - // AND message_block is empty - // - // (b) uncompleted_message_ zero, the completed message at the - // tail of the incoming queue; message_block could be empty - // or still contain bytes + // Make a node of the message block.. + TAO_Queued_Data qd (&message_block, + this->orb_core_->transport_message_buffer_allocator ()); - // ==> repeat - do - { - /* - * Append the "right number of bytes" to uncompleted_message_ - */ - // ==> right_number_of_bytes = MIN(bytes missing from - // uncompleted_message_, length of message_block); - size_t right_number_of_bytes = - ACE_MIN (this->uncompleted_message_->missing_data_bytes_, - message_block.length () ); + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "trying to use %u (of %u) " - "bytes to complete message missing %u bytes\n", - this->id (), - right_number_of_bytes, - message_block.length (), - this->uncompleted_message_->missing_data_bytes_)); - } + // Check whether the message was fragmented.. + if (qd.more_fragments_ || + (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Duplicate the node that we have as the node is on stack.. + TAO_Queued_Data *nqd = + TAO_Queued_Data::duplicate (qd); - // ==> append right_number_of_bytes from message_block - // to uncomplete_message_ & update read pointer of - // message_block; - - // 1. we assume that uncompleted_message_.msg_block_'s - // wr_ptr is properly maintained - // 2. we presume that uncompleted_message_.msg_block was - // allocated with enough space to contain the *entire* - // expected GIOP message, so this copy shouldn't involve an - // additional allocation - this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (), - right_number_of_bytes); - this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; - message_block.rd_ptr (right_number_of_bytes); - - switch (this->uncompleted_message_->current_state_) - { - case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER: - { - int hdrvalidity = this->messaging_object()->check_for_valid_header ( - *this->uncompleted_message_->msg_block_); - if (hdrvalidity == 0) - { - // According to the spec, Section 15.4.8, we should send - // the MessageError GIOP message on receipt of "any message...whose - // header is not properly formed (e.g., has the wrong magic value)". - // - // So, rather than returning -1, what we REALLY need to do is - // send a MessageError in reply. - // - // I'm not sure what the best way to trigger that is...probably to - // queue up a special internal-only COMPLETED message that, when - // processed, sends the MessageError as part of its processing. - return -1; - } - else if (hdrvalidity == 1) - { - // ==> update bytes missing from uncompleted_message_ - // with size of message from valid header; - this->messaging_object()->set_queued_data_from_message_header ( - this->uncompleted_message_, - *this->uncompleted_message_->msg_block_); - // ==> change state of uncompleted_event_ to - // WAITING_TO_COMPLETE_PAYLOAD; - this->uncompleted_message_->current_state_ = - TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD; - - // ==> Resize the message block to have capacity for - // the rest of the incoming message - ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_; - ACE_CDR::grow (&mb, - mb.size () - + this->uncompleted_message_->missing_data_bytes_); - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "found a valid header in the message; " - "waiting for %u bytes to complete payload\n", - this->id (), - this->uncompleted_message_->missing_data_bytes_)); - } - - // Continue the loop... - continue; - } - // In the case where we don't have enough information (hdrvalidity == -1), - // we just have to fall through and collect more. -#if 0 - else - { - // What the heck will we do with a bad header? Just - // better to close the connection and let things - // re-train from there. - if (this->uncompleted_message_->msg_block_->length () == - this->messaging_object()->header_length()) - return -1; - -#if 0 // I don't think I need this clause, but I'm leaving it just in case. - // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes; - this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; - ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0); -#endif - } -#endif - } - break; - - case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD: - // Here we have an opportunity to try to finish reading the - // uncompleted message. This is a Good Idea(TM) because there are - // good odds that either more data came available since the last - // time we read, or that we simply didn't read the whole message on - // the first read. So, we try to read again. - // - // NOTE! this changes this->uncompleted_message_! - this->try_to_complete (max_wait_time); + return this->consolidate_fragments (nqd, rh); + } - // ==> if (bytes missing from uncompleted_message_ == 0) - if (this->uncompleted_message_->missing_data_bytes_ == 0) - { - /* - * We completed the message! Hooray! - */ - // ==> place uncompleted_message_ (which is now - // complete!) at the tail of the incoming message - // queue; - - // ---> NOTE: whoever pulls this off the queue must delete it! - this->uncompleted_message_->current_state_ - = TAO_Queued_Data::COMPLETED; - - // @@CJC NEED TO CHECK RETURN VALUE HERE! - this->enqueue_incoming_message (this->uncompleted_message_); - did_queue_message = 1; - // zero out uncompleted_message_; - this->uncompleted_message_ = 0; - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "completed and queued message for processing!\n", - this->id ())); - } + // Process the message + return this->process_parsed_messages (&qd, + rh); +} - } - else - { +int +TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + if (this->parse_incoming_messages (block) == -1) + { + return -1; + } - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "still need %u bytes to complete uncompleted message.\n", - this->id (), - this->uncompleted_message_->missing_data_bytes_)); - } - } - break; + // Check whether we have a complete message for processing + ssize_t missing_data = this->missing_data (block); - default: - // @@CJC What do we do here?! - ACE_ASSERT (! "Transport::handle_input_i: unexpected state" - "in uncompleted_message_"); - } - } - // Does the order of the checks matter? In both (a) and (b), - // message_block is empty, but only in (b) is there no - // uncompleted_message_. - // ==> until (message_block is empty || there is no uncompleted_message_); - // or, rewritten in C++ looping constructs - // ==> while ( ! message_block is empty && there is an uncompleted_message_ ); - while (message_block.length() != 0 && this->uncompleted_message_); - } - - // ***************************** - // @@ CJC - // - // Once upon a time we tried to complete reading the uncompleted - // message here, but testing found that completing later worked - // better. - // ***************************** - - - // At this point, there should be nothing in uncompleted_message_. - // We now need to chop up the bytes in message_block and store any - // complete messages in the incoming message queue. - // - // ==> if (message_block still has data) - if (message_block.length () != 0) - { - TAO_Queued_Data *complete_message = 0; - do - { - if (TAO_debug_level >= 10) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ") - ACE_TEXT("extracting complete messages\n"))); - ACE_HEX_DUMP ((LM_DEBUG, - message_block.rd_ptr (), - message_block.length (), - ACE_TEXT (" from this message buffer"))); - } - complete_message = - TAO_Queued_Data::make_completed_message ( - message_block, *this->messaging_object ()); - if (complete_message) - { - this->enqueue_incoming_message (complete_message); - did_queue_message = 1; - } - } - while (complete_message != 0); - // On exit from this frame we have one of the following states: - // (a) message_block is empty - // (b) message_block contains bytes from a partial message - } - - // If, at this point, there's still data in message_block, it's - // an incomplete message. Therefore, we stuff it into the - // uncompleted_message_ and clear out message_block. - // ==> if (message_block still has data) - if (message_block.length () != 0) - { - // duplicate message_block remainder into this->uncompleted_message_ - ACE_ASSERT (this->uncompleted_message_ == 0); - this->uncompleted_message_ = - TAO_Queued_Data::make_uncompleted_message (&message_block, - *this->messaging_object ()); - ACE_ASSERT (this->uncompleted_message_ != 0); - - // In a debug build, we won't reach this point if we couldn't - // create an uncompleted message because the above ASSERT will - // trip. However, in an optimized build, the ASSERT isn't - // there, so we'll go past here. - // - // We could put a check in here similar to the ASSERT condition, - // but doing that would terminate this loop early and result in - // our never processing any completed messages that were received - // in this trip to handle_input_i. - // - // Maybe we could instead queue up a special completed message that, - // when processed, causes the connection to get closed in a non-graceful - // termination scenario. - } - - // We should have consumed ALL the bytes by now. - ACE_ASSERT (message_block.length () == 0); - - // - // We don't want to try to re-read earlier because we may not have - // an uncompleted message until we get to this point. So, if we did - // it earlier, we could have missed the opportunity to complete it - // and dispatch. - // - // Thanks to Bala for the idea to read again - // to increase throughput! - - if (this->uncompleted_message_) - { - if (number_of_read_attempts--) - { - // We try to read again just in case more data arrived while - // we were doing the stuff above. This way, we can increase - // throughput without much of a penalty. + if (missing_data < 0) + { + // If we have more than one message + return this->consolidate_extra_messages (block, + rh); + } + else if (missing_data > 0) + { + // If we have missing data then try doing a read or try queueing + // them. + return this->consolidate_message (block, + missing_data, + rh, + max_wait_time); + } - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "still have an uncompleted message; " - "will try %d more times before letting " - "somebody else have a chance.\n", - this->id (), - number_of_read_attempts)); - } + return 1; +} - // We only bother trying to complete payload, not header, because the - // retry only happens in the complete-the-payload clause above. - if (this->uncompleted_message_->current_state_ == - TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD) - goto complete_message_and_possibly_enqueue; - } - else +int +TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) +{ + // If we have a queue and if the last message is not complete a + // complete one, then this read will get us the remaining data. So + // do not try to parse the header if we have an incomplete message + // in the queue. + if (this->incoming_message_queue_.is_tail_complete () != 0) + { + // As it looks like a new message has been read, process the + // message. Call the messaging object to do the parsing.. + int retval = + this->messaging_object ()->parse_incoming_messages (block); + + if (retval == -1) { - // The queue should be empty because it should have been processed - // above. But I wonder if I should put a check in here anyway. if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "giving up reading for now and returning " - "with incoming queue length = %d\n", - this->id (), - this->incoming_message_queue_.queue_length ())); - if (this->uncompleted_message_) - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Transport[%d]::handle_input_i: " - "missing bytes from uncompleted message = %u\n", - this->id (), - this->uncompleted_message_->missing_data_bytes_)); - } - return 1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " + "error in incoming message\n", + this->id ())); + + return -1; } } - // **** END CJC PMG CHANGES **** + return 0; +} + + +size_t +TAO_Transport::missing_data (ACE_Message_Block &incoming) +{ + // If we have a incomplete message in the queue then find out how + // much of data is required to get a complete message. + if (this->incoming_message_queue_.is_tail_complete () == 0) + { + return this->incoming_message_queue_.missing_data_tail (); + } - return did_queue_message ? this->process_queue_head (rh) : 1; + return this->messaging_object ()->missing_data (incoming); } -void -TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) +int +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - if (this->uncompleted_message_ == 0) - return; + // Check whether the last message in the queue is complete.. + if (this->incoming_message_queue_.is_tail_complete () == 0) + { + return this->consolidate_message_queue (incoming, + missing_data, + rh, + max_wait_time); + } + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message\n", + this->id ())); + } + + // Calculate the actual length of the load that we are supposed to + // read which is equal to the + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.size (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); ssize_t n = 0; - size_t &missing_data = this->uncompleted_message_->missing_data_bytes_; - ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_; - // Try to complete this until we error or block right here... - for (ssize_t bytes = missing_data; - bytes != 0; - bytes -= n) + // As this used for transports where things are available in one + // shot this looping should not create any problems. + for (ssize_t bytes = missing_data; bytes != 0; bytes -= n) { // .. do a read on the socket again. - n = this->recv (mb.wr_ptr (), + n = this->recv (incoming.wr_ptr (), bytes, max_wait_time); if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "TAO (%P|%t) - Transport[%d]::consolidate_message, " "read %d bytes on attempt\n", this->id(), n)); } @@ -1645,161 +1395,366 @@ TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) break; } - mb.wr_ptr (n); + incoming.wr_ptr (n); missing_data -= n; } + + // If we got an error.. + if (n == -1) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Trasport[%d]::consolidate_message, " + "error while trying to consolidate\n", + this->id ())); + } + + return -1; + } + + // If we had gotten a EWOULDBLOCK n would be equal to zero. But we + // have to put the message in the queue anyway. So let us proceed + // to do that and return... + + // Check to see if we have messages in queue or if we have missing + // data . AT this point we cannot have have semi-complete messages + // in the queue as they would have been taken care before. Put + // ourselves in the queue and then try processing one of the + // messages.. + if ((missing_data > 0 + ||this->incoming_message_queue_.queue_length ()) + && this->incoming_message_queue_.is_tail_fragmented () == 0) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "queueing up the message\n", + this->id ())); + } + + // Get a queued data + TAO_Queued_Data *qd = + this->make_queued_data (incoming); + + // Add the missing data to the queue + qd->missing_data_ = missing_data; + + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + if (this->incoming_message_queue_.is_head_complete ()) + { + return this->process_queue_head (rh); + } + + return 0; + } + + // We dont have any missing data. Just make a queued_data node with + // the existing message block and send it to the higher layers of + // the ORB. + TAO_Queued_Data pqd (&incoming, + this->orb_core_->transport_message_buffer_allocator ()); + pqd.missing_data_ = missing_data; + this->messaging_object ()->get_message_data (&pqd); + + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (pqd.more_fragments_ || + (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + // Duplicate the queued data as it is on stack.. + TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); + + return this->consolidate_fragments (nqd, rh); + } + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (&pqd, + rh); } +int +TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh) +{ + // If we have received a fragment message then we have to + // consolidate with the last message in queue + // @@todo: this piece of logic follows GIOP a bit... Need to revisit + // if we have protocols other than GIOP + + // @@todo: Fragments now have too much copying overhead. Need to get + // them out if we want to have some reasonable performance metrics + // in future.. Post 1.2 seems a nice time.. + if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + TAO_Queued_Data *tqd = + this->incoming_message_queue_.dequeue_tail (); + + tqd->more_fragments_ = qd->more_fragments_; + tqd->missing_data_ = qd->missing_data_; + + if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1) + { + return -1; + } + + TAO_Queued_Data::release (qd); + this->incoming_message_queue_.enqueue_tail (tqd); + this->process_queue_head (rh); + } + else + { + // if we dont have a fragment already in the queue just add it in + // the queue + this->incoming_message_queue_.enqueue_tail (qd); + } + + return 0; +} int -TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) +TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - // Get the GIOP version - CORBA::Octet major = queueable_message->major_version_; - CORBA::Octet minor = queueable_message->minor_version_; - CORBA::UShort whole = major << 8 | minor; - - // Set up a couple of pointers that are shared by the code - // for the different GIOP versions. - ACE_Message_Block *mb = 0; - TAO_Queued_Data *fragment_message = 0; - - switch(whole) - { - case 0x0100: // GIOP 1.0 - if (!queueable_message->more_fragments_) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - - // Fragments aren't supported in 1.0. This is an error and - // we should reject it somehow. What do we do here? Do we throw - // an exception to the receiving side? Do we throw an exception - // to the sending side? - // - // At the very least, we need to log the fact that we received - // nonsense. - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - " - "TAO_Transport::enqueue_incoming_message " - "detected a fragmented GIOP 1.0 message\n"), - -1); - break; - case 0x0101: // GIOP 1.1 - // In 1.1, fragments kinda suck because they don't have they're - // own message-specific header. Therefore, we have to do the - // following: - fragment_message = - this->incoming_message_queue_.find_fragment (major, minor); - - // No fragment was found - if (fragment_message == 0) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - - if (queueable_message->more_fragments_) + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", + this->id ())); + } + + // If the queue did not have a complete message put this piece of + // message in the queue. We know it did not have a complete + // message. That is why we are here. + size_t n = + this->incoming_message_queue_.copy_tail (incoming); + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "copied [%d] bytes to the tail\n", + this->id (), + n)); + } + + // Update the missing data... + missing_data = + this->incoming_message_queue_.missing_data_tail (); + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "missing [%d] bytes in the tail message\n", + this->id (), + missing_data)); + } + + // Move the read pointer of the message block to the end + // of the copied message and process the remaining portion... + incoming.rd_ptr (n); + + // If we have some more information left in the message block.. + if (incoming.length ()) + { + // We may have to parse & consolidate. This part of the message + // doesn't seem to be part of the last message in the queue (as + // the copy () hasn't taken away this message). + int retval = this->parse_consolidate_messages (incoming, + rh, + max_wait_time); + + // If there is an error return + if (retval == -1) { - // Find the last message block in the continuation - mb = fragment_message->msg_block_; - while (mb->cont () != 0) - mb = mb->cont (); - - // Add the current message block to the end of the chain - // after adjusting the read pointer to skip the GIOP header - queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN); - mb->cont (queueable_message->msg_block_); - - // Get rid of the queuable message but save the message block - queueable_message->msg_block_ = 0; - queueable_message->release (); - - // One note is that TAO_Queued_Data contains version numbers, - // but doesn't indicate the actual protocol to which those - // version numbers refer. That's not a problem, though, because - // instances of TAO_Queued_Data live in a queue, and that queue - // lives in a particular instance of a Transport, and the - // transport instance has an association with a particular - // messaging_object. The concrete messaging object embodies a - // messaging protocol, and must cover all versions of that - // protocol. Therefore, we just need to cover the bases of all - // versions of that one protocol. + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "error while consolidating, part of the read message\n", + this->id ())); + } + return retval; } - else + else if (retval == 1) { - // There is a complete chain of fragments - fragment_message->consolidate (); + // If the message in the message block has only + // one message left we need to process that seperately. + + // Get a queued data + TAO_Queued_Data *qd = this->make_queued_data (incoming); + + // Get the rest of the message data + this->messaging_object ()->get_message_data (qd); + + // Add the missing data to the queue + qd->missing_data_ = 0; - // Go ahead and enqueue this message - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); + // Check whether the message was fragmented and try to consolidate + // the fragments.. + if (qd->more_fragments_ || + (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) + { + return this->consolidate_fragments (qd, rh); + } + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + // We should surely have a message in queue now. So just + // process that. + return this->process_queue_head (rh); } - break; - case 0x0102: // GIOP 1.2 - // In 1.2, we get a little more context. There's a - // FRAGMENT message-specific header, and inside that is the - // request id with which the fragment is associated. - fragment_message = - this->incoming_message_queue_.find_fragment ( - queueable_message->request_id_); - - // No fragment was found - if (fragment_message == 0) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - - if (fragment_message->major_version_ != major || - fragment_message->minor_version_ != minor) - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - " - "TAO_Transport::enqueue_incoming_message " - "GIOP versions do not match " - "(%d.%d != %d.%d\n", - fragment_message->major_version_, - fragment_message->minor_version_, - major, minor), - -1); - - // Find the last message block in the continuation - mb = fragment_message->msg_block_; - while (mb->cont () != 0) - mb = mb->cont (); - - // Add the current message block to the end of the chain - // after adjusting the read pointer to skip the GIOP header - queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN + - TAO_GIOP_MESSAGE_FRAGMENT_HEADER); - mb->cont (queueable_message->msg_block_); - - // Remove our reference to the message block. At this point - // the message block of the fragment head owns it as part of a - // chain - queueable_message->msg_block_ = 0; - - if (!queueable_message->more_fragments_) + + // parse_consolidate_messages () would have processed one of the + // messages, so we better return as we dont want to starve other + // threads. + return 0; + } + + // If we still have some missing data.. + if (missing_data > 0) + { + // Get the last message from the Queue + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_tail (); + + if (TAO_debug_level > 5) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "trying recv, again\n", + this->id ())); + } + + // Try to do a read again. If we have some luck it would be + // great.. + ssize_t n = this->recv (qd->msg_block_->wr_ptr (), + missing_data, + max_wait_time); + + if (TAO_debug_level > 5) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " + "recv retval [%d]\n", + this->id (), + n)); + } + + // Error... + if (n < 0) + { + return n; + } + + // If we get a EWOULDBLOCK ie. n==0, we should anyway put the + // message in queue before returning.. + // Move the write pointer + qd->msg_block_->wr_ptr (n); + + // Decrement the missing data + qd->missing_data_ -= n; + + // Now put the TAO_Queued_Data back in the queue + this->incoming_message_queue_.enqueue_tail (qd); + + // Any way as we have come this far and are about to return, + // just try to process a message if it is there in the queue. + if (this->incoming_message_queue_.is_head_complete ()) { - // This is the end of the fragments for this request - fragment_message->consolidate (); + return this->process_queue_head (rh); } - // Get rid of the queuable message - queueable_message->release (); - break; - default: - if (!queueable_message->more_fragments_) - return this->incoming_message_queue_.enqueue_tail ( - queueable_message); - // This is an unknown GIOP version - ACE_ERROR_RETURN ((LM_ERROR, - "TAO (%P|%t) - " - "TAO_Transport::enqueue_incoming_message " - "can not handle a fragmented GIOP %d.%d " - "message\n", major, minor), - -1); + return 0; } - return 0; + // Process a message in the head of the queue if we have one.. + return this->process_queue_head (rh); } +int +TAO_Transport::consolidate_extra_messages (ACE_Message_Block + &incoming, + TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", + this->id ())); + } + + // Pick the tail of the queue + TAO_Queued_Data *tail = + this->incoming_message_queue_.dequeue_tail (); + + if (tail) + { + // If we have a node in the tail, checek to see whether it needs + // consolidation. If so, just consolidate it. + if (this->messaging_object ()->consolidate_node (tail, incoming) == -1) + { + return -1; + } + + // .. put the tail back in queue.. + this->incoming_message_queue_.enqueue_tail (tail); + } + + int retval = 1; + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " + "extracting extra messages\n", + this->id ())); + } + + // Extract messages.. + while (retval == 1) + { + TAO_Queued_Data *q_data = 0; + + retval = + this->messaging_object ()->extract_next_message (incoming, + q_data); + if (q_data) + { + // If we have read a framented message then... + if (q_data->more_fragments_ || + q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) + { + this->consolidate_fragments (q_data, rh); + } + else + { + this->incoming_message_queue_.enqueue_tail (q_data); + } + } + } + + // In case of error return.. + if (retval == -1) + { + return retval; + } + + return this->process_queue_head (rh); +} + int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) @@ -1807,6 +1762,8 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, // Get the that we have received TAO_Pluggable_Message_Type t = qd->msg_type_; + // int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { if (TAO_debug_level > 0) @@ -1871,69 +1828,126 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return 0; } -int -TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) +TAO_Queued_Data * +TAO_Transport::make_queued_data (ACE_Message_Block &incoming) { - if (TAO_debug_level > 3) + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data ( + this->orb_core_->transport_message_buffer_allocator ()); + + // Get the flag for the details of the data block... + ACE_Message_Block::Message_Flags flg = + incoming.self_flags (); + + if (ACE_BIT_DISABLED (flg, + ACE_Message_Block::DONT_DELETE)) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head\n", - this->id ())); + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = ACE_Message_Block::duplicate (&incoming); } + else + { + // As we are in CORBA mode, all the data blocks would be aligned + // on an 8 byte boundary. Hence create a data block for more + // than the actual length + ACE_Data_Block *db = + this->orb_core_->create_input_cdr_data_block (incoming.length ()+ + ACE_CDR::MAX_ALIGNMENT); - if (this->incoming_message_queue_.is_head_complete () != 1) - return 1; + // Get the allocator.. + ACE_Allocator *alloc = + this->orb_core_->input_cdr_msgblock_allocator (); - // Get the message on the head of the queue.. - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_head (); + // Make message block.. + ACE_Message_Block mb (db, + 0, + alloc); + + // Duplicate the block.. + qd->msg_block_ = mb.duplicate (); + + // Align the message block + ACE_CDR::mb_align (qd->msg_block_); + + // Copy the data.. + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + } + + + return qd; +} +int +TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) +{ if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "the size of the queue is [%d]\n", - this->id (), - this->incoming_message_queue_.queue_length())); + "TAO (%P|%t) - Transport[%d]::process_queue_head\n", + this->id ())); } - // Now that we have pulled out out one message out of the queue, - // check whether we have one more message in the queue... - if (this->incoming_message_queue_.queue_length () > 0) + + // See if the message in the head of the queue is complete... + if (this->incoming_message_queue_.is_head_complete () > 0) { - if (TAO_debug_level > 0) + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "notify reactor\n", - this->id ())); + "the size of the queue is [%d]\n", + this->id (), + this->incoming_message_queue_.queue_length())); + } + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.is_head_complete () > 0) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "notify reactor\n", + this->id ())); + } + int retval = + this->notify_reactor (); + + if (retval == 1) + { + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + } + else if (retval < 0) + return -1; + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); } - int retval = - this->notify_reactor (); - if (retval == 1) + // Process the message... + if (this->process_parsed_messages (qd, rh) == -1) { - // Let the class know that it doesn't need to resume the - // handle.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + return -1; } - else if (retval < 0) - return -1; - } - else - { - // As we are ready to process the last message just resume - // the handle. Set the flag incase someone had reset the flag.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); - } - // Process the message... - int retval = this->process_parsed_messages (qd, rh); + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); - // Delete the Queued_Data.. - TAO_Queued_Data::release (qd); + return 0; + } - return (retval == -1) ? -1 : 0; + return 1; } int @@ -1945,8 +1959,6 @@ TAO_Transport::notify_reactor (void) } ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; // Get the reactor associated with the event handler ACE_Reactor *reactor = this->orb_core ()->reactor (); @@ -1983,18 +1995,6 @@ TAO_Transport::transport_cache_manager (void) return this->orb_core_->lane_resources ().transport_cache (); } -size_t -TAO_Transport::recv_buffer_size (void) -{ - return this->recv_buffer_size_; -} - -size_t -TAO_Transport::sent_byte_count (void) -{ - return this->sent_byte_count_; -} - void TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp) { diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 4b885eb40e5..b6059b211fe 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -447,6 +447,7 @@ public: virtual ACE_Event_Handler * event_handler_i (void) = 0; protected: + virtual TAO_Connection_Handler * connection_handler_i (void) = 0; public: @@ -488,7 +489,6 @@ public: virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time = 0, int block = 0); - void try_to_complete (ACE_Time_Value *max_wait_time); enum { @@ -568,11 +568,60 @@ public: protected: + /// Called by the handle_input_i (). This method is used to parse + /// message read by the handle_input_i () call. It also decides + /// whether the message needs consolidation before processing. + int parse_consolidate_messages (ACE_Message_Block &bl, + TAO_Resume_Handle &rh, + ACE_Time_Value *time = 0); + + + /// Method does parsing of the message if we have a fresh message in + /// the or just returns if we have read part of the + /// previously stored message. + int parse_incoming_messages (ACE_Message_Block &message_block); + + /// Return if we have any missing data in the queue of messages + /// or determine if we have more information left out in the + /// presently read message to make it complete. + size_t missing_data (ACE_Message_Block &message_block); + + /// Consolidate the currently read message or consolidate the last + /// message in the queue. The consolidation of the last message in + /// the queue is done by calling consolidate_message_queue (). + virtual int consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); + + /// @@Bala: Docu??? + int consolidate_fragments (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh); + + /// First consolidate the message queue. If the message is still not + /// complete, try to read from the handle again to make it + /// complete. If these dont help put the message back in the queue + /// and try to check the queue if we have message to process. (the + /// thread needs to do some work anyway :-)) + int consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); + + /// Called by parse_consolidate_message () if we have more messages + /// in one read. Queue up the messages and try to process one of + /// them, atleast at the head of them. + int consolidate_extra_messages (ACE_Message_Block &incoming, + TAO_Resume_Handle &rh); + /// Process the message by sending it to the higher layers of the /// ORB. int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); + /// Make a queued data from the message block + TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming); + /// Implement send_message_shared() assuming the handler_lock_ is /// held. int send_message_shared_i (TAO_Stub *stub, @@ -620,40 +669,6 @@ public: int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); - /// Accessor to recv_buffer_size_ - size_t recv_buffer_size (void); - - /// Accessor to sent_byte_count_ - size_t sent_byte_count (void); - - - /*! - \name Incoming Queue Methods - */ - //@{ - /*! - \brief Queue up \a queueable_message as a completely-received incoming message. - - This method queues up a completely-received queueable GIOP message - (i.e., it must be dynamically-allocated). It does not assemble a - complete GIOP message; that should be done prior to calling this - message, and is currently done in handle_input_i. - - This does, however, assure that a completely-received GIOP - FRAGMENT gets associated with any previously-received related - fragments. It does this through collaboration with the messaging - object (since fragment reassembly is protocol specific). - - \param queueable_message instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received - - \return 0 successfully enqueued \a queueable_message - - \return -1 failed to enqueue \a queueable_message - \todo How do we indicate \em what may have failed? - */ - int enqueue_incoming_message (TAO_Queued_Data *queueable_message); - //@} - /// CodeSet Negotiation - Get the char codeset translator factory /// TAO_Codeset_Translator_Factory *char_translator (void) const; @@ -777,14 +792,10 @@ private: /// Print out error messages if the event handler is not valid void report_invalid_event_handler (const char *caller); - /** + /* * Process the message that is in the head of the incoming queue. * If there are more messages in the queue, this method calls - * this->notify_reactor () to wake up a thread. - * - * \return -1 An error occurred; occurs independent presence of messages in the queue. - * \return 1 No messages in the queue to process; nothing processed. - * \return 0 Messages were in the queue to process and one got processed. + * this->notify_reactor () to wake up a thread */ int process_queue_head (TAO_Resume_Handle &rh); @@ -845,12 +856,9 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; - /// Queue of the completely-received incoming messages.. + /// Queue of the incoming messages.. TAO_Incoming_Message_Queue incoming_message_queue_; - /// Place to hold a partially-received (waiting-to-be-completed) message - TAO_Queued_Data * uncompleted_message_; - /// The queue will start draining no later than /// *if* the deadline is ACE_Time_Value current_deadline_; @@ -885,11 +893,8 @@ protected: /// Used by the LRU, LFU and FIFO Connection Purging Strategies. unsigned long purging_order_; - /// Size of the buffer received. - size_t recv_buffer_size_; +private: - /// Number of bytes sent. - size_t sent_byte_count_; /// @@Phil, I think it would be nice if we could think of a way to /// do the following. /// We have been trying to use the transport for marking about @@ -902,7 +907,6 @@ protected: /// we can move this to the connection_handler and it may more sense /// with the DSCP stuff around there. Do you agree? -private: /// Additional member values required to support codeset translation TAO_Codeset_Translator_Factory *char_translator_; TAO_Codeset_Translator_Factory *wchar_translator_; -- cgit v1.2.1