From d93020833c3837bde053fbbeddb27b28f010ebbd Mon Sep 17 00:00:00 2001 From: bala Date: Wed, 4 Jul 2001 23:40:11 +0000 Subject: ChangeLogTag:Wed Jul 4 16:21:22 2001 Balachandran Natarajan --- TAO/tao/GIOP_Message_Base.cpp | 53 ++-------------- TAO/tao/GIOP_Message_Base.h | 16 +++-- TAO/tao/GIOP_Message_State.cpp | 8 +-- TAO/tao/GIOP_Message_State.h | 41 +++--------- TAO/tao/GIOP_Message_State.i | 11 ++-- TAO/tao/GIOP_Message_State.inl | 17 +++-- TAO/tao/Incoming_Message_Queue.cpp | 7 ++- TAO/tao/Pluggable_Messaging.cpp | 6 -- TAO/tao/Pluggable_Messaging.h | 18 +++--- TAO/tao/Transport.cpp | 126 ++++++++++++++++++++++++++----------- TAO/tao/Transport.h | 5 +- 11 files changed, 151 insertions(+), 157 deletions(-) diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index b51a0c79f86..a5edf3980d8 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -46,10 +46,9 @@ TAO_GIOP_Message_Base::init (CORBA::Octet major, void -TAO_GIOP_Message_Base::reset (int /*reset_flag*/) +TAO_GIOP_Message_Base::reset (void) { - // Reset the message state - // this->message_handler_.reset (reset_flag); + // no-op } int @@ -172,42 +171,7 @@ TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/, int /*block */, ACE_Time_Value * /*max_wait_time*/) { -#if 0 - // Call the handler to read and do a simple parse of the header of - // the message. - int retval = - this->message_handler_.read_messages (transport); - - if (retval < 1) - return retval; - - retval = this->message_handler_.parse_message_header (); - - - // Error in the message that was received - if (retval == -1) - return -1; - // If -2, we want the reactor to call us back, so return 1 - else if (retval == -2) - return 1; - - if (retval != 0) - { - // Get the message state - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); - - // Set the state internally for parsing and generating messages - this->set_state (state.giop_version.major, - state.giop_version.minor); - } - -#endif /* if 0*/ - // We return 2, it is ugly. But the reactor semantics has made us to - // limp :( - return 2; - - + return 0; } int @@ -310,14 +274,13 @@ TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) return -1; } - return 0; } ssize_t TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) { - //Actual message size including the header.. + // Actual message size including the header.. CORBA::ULong msg_size = this->message_state_.message_size (); @@ -338,9 +301,6 @@ int TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, TAO_Queued_Data *&qd) { - //ACE_DEBUG ((LM_DEBUG, - // "TAO (%P|%t) Extracting extra messages... \n")); - TAO_GIOP_Message_State state (this->orb_core_, this); @@ -367,9 +327,6 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, size_t copying_len = state.message_size (); - // ACE_DEBUG ((LM_DEBUG, - // "TAO (%P|%t) ... queueing messages.. \n")); - qd = this->make_queued_data (copying_len); if (copying_len > incoming.length ()) @@ -505,7 +462,7 @@ TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) this->message_type (this->message_state_); // Reset the message_state - // this->message_state_.reset (0); + this->message_state_.reset (); } int diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 1f89053be60..280657fd03b 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -58,7 +58,7 @@ public: CORBA::Octet minor); /// Reset the messaging the object - virtual void reset (int reset_flag = 1); + virtual void reset (void); /// Write the RequestHeader in to the stream. The underlying /// implementation of the mesaging should do the right thing. @@ -93,21 +93,25 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); - - - - /// @@Bala:Documentation please... + /// Parse the incoming messages.. virtual int parse_incoming_messages (ACE_Message_Block &message_block); - /// @@Bala:Documentation please.. + /// Calculate the amount of data that is missing in the + /// message block. virtual ssize_t missing_data (ACE_Message_Block &message_block); + /* Extract the details of the next message from the + * through . Returns 1 if there are more messages and returns a + * 0 if there are no more messages in . + */ virtual int extract_next_message (ACE_Message_Block &incoming, TAO_Queued_Data *&qd); + /// Check whether the node needs consolidation from virtual int consolidate_node (TAO_Queued_Data *qd, ACE_Message_Block &incoming); + /// Get the details of the message parsed through the . virtual void get_message_data (TAO_Queued_Data *qd); /// Process the request message that we have received on the diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index 1b433dfdf39..36ebdcd41ae 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -25,10 +25,8 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State ( message_size_ (0), request_id_ (0), more_fragments_ (0), - missing_data_ (0), - message_status_ (TAO_GIOP_WAITING_FOR_HEADER) + missing_data_ (0) { - } @@ -92,10 +90,6 @@ TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) incoming.length ()); } - // The GIOP header has been parsed. Set the status to wait for - // payload - this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; - return 0; } diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index 8a9a878f597..3f262373f54 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -34,16 +34,9 @@ class TAO_GIOP_Message_Base; * * @brief Generic definitions for Message States. * - * @@Bala: More documentation please... - * - * This represents the state of the incoming GIOP message - * As the ORB processes incoming messages it needs to keep track of - * how much of the message has been read, if there are any - * fragments following this message etc. + * This helps to establish the state of the incoming messages. */ - - class TAO_Export TAO_GIOP_Message_State { public: @@ -52,19 +45,6 @@ public: TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, TAO_GIOP_Message_Base *base); - enum TAO_GIOP_Message_Status - { - /// The header of the message hasn't shown up yet. - TAO_GIOP_WAITING_FOR_HEADER = 0, - - /// The payload hasn't fully shown up in the yet - TAO_GIOP_WAITING_FOR_PAYLOAD, - - /// The message read has got multiple requests - TAO_GIOP_MULTIPLE_MESSAGES - }; - - /// @@Bala: Documentation please... /// Parse the message header. int parse_message_header (ACE_Message_Block &incoming); @@ -77,13 +57,14 @@ public: /// Return the byte order information CORBA::Octet byte_order (void) const; + /// Reset the state.. + void reset (void); + private: friend class TAO_GIOP_Message_Base; - - - /// @@Bala: Documentation please... + /// Parse the message header. int parse_message_header_i (ACE_Message_Block &incoming); /// Checks for the magic word 'GIOP' in the start of the incoing @@ -114,6 +95,7 @@ private: private: + // GIOP version information.. TAO_GIOP_Message_Version giop_version_; /// 0 = big, 1 = little @@ -134,7 +116,7 @@ private: * chain is reassembled into the main message block that is sent * along */ - ACE_Message_Block fragmented_messages; + // ACE_Message_Block fragmented_messages; /** @@ -148,11 +130,11 @@ private: * 3) Even if we allowed that at this layer the CDR classes are * not prepared to handle that. */ - CORBA::Octet first_fragment_byte_order; + // CORBA::Octet first_fragment_byte_order; /// The GIOP version for the first fragment /// @@ Same as above, all GIOP versions must match. - TAO_GIOP_Message_Version first_fragment_giop_version; + // TAO_GIOP_Message_Version first_fragment_giop_version; /** * If the messages are chained this represents the message type for @@ -160,16 +142,13 @@ private: * fragment and the upper level needs to know if it is a request, * locate request or what). */ - CORBA::Octet first_fragment_message_type; + // CORBA::Octet first_fragment_message_type; /// (Requests and Replys) CORBA::Octet more_fragments_; /// Missing data CORBA::ULong missing_data_; - - /// @@Bala: Documentation?? - TAO_GIOP_Message_Status message_status_; }; diff --git a/TAO/tao/GIOP_Message_State.i b/TAO/tao/GIOP_Message_State.i index b2633815ebf..773faefe69b 100644 --- a/TAO/tao/GIOP_Message_State.i +++ b/TAO/tao/GIOP_Message_State.i @@ -2,16 +2,13 @@ //$Id$ -// **************************************************************** -// @@ Bala: we use the stars to separate classes in ACE+TAO - // // Inlined methods for TAO_GIOP_Message_State // ACE_INLINE int TAO_GIOP_Message_State::message_fragmented (void) { - if (this->more_fragments) + if (this->more_fragments_) return 1; return 0; @@ -20,12 +17,12 @@ TAO_GIOP_Message_State::message_fragmented (void) ACE_INLINE void TAO_GIOP_Message_State::reset (int /*reset_contents*/) { - this->message_size = 0; - this->more_fragments = 0; + this->message_size_ = 0; + this->more_fragments_ = 0; } ACE_INLINE CORBA::Boolean TAO_GIOP_Message_State::header_received (void) const { - return this->message_size != 0; + return this->message_size_ != 0; } diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl index cf326079aad..b6d2989eaa9 100644 --- a/TAO/tao/GIOP_Message_State.inl +++ b/TAO/tao/GIOP_Message_State.inl @@ -27,6 +27,16 @@ TAO_GIOP_Message_State::byte_order (void) const return this->byte_order_; } +ACE_INLINE void +TAO_GIOP_Message_State::reset (void) +{ + this->message_type_ = 0; + this->message_size_ = 0; + this->more_fragments_ = 0; + this->request_id_ = 0; + this->missing_data_ = 0; +} + #if 0 ACE_INLINE int TAO_GIOP_Message_State::message_fragmented (void) @@ -37,12 +47,7 @@ TAO_GIOP_Message_State::message_fragmented (void) return 0; } -ACE_INLINE void -TAO_GIOP_Message_State::reset (int /*reset_contents*/) -{ - this->message_size = 0; - this->more_fragments = 0; -} + ACE_INLINE CORBA::Boolean TAO_GIOP_Message_State::header_received (void) const diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 37a6b86f762..17f8b31d6b9 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -19,7 +19,12 @@ TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core) TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void) { - // @@Bala:Need to delete all the unused data-blocks + // Delete the QD + if (this->size_) + { + TAO_Queued_Data *qd = this->dequeue_head (); + TAO_Queued_Data::release (qd); + } } size_t diff --git a/TAO/tao/Pluggable_Messaging.cpp b/TAO/tao/Pluggable_Messaging.cpp index c7ec77f596d..65c2c20a01c 100644 --- a/TAO/tao/Pluggable_Messaging.cpp +++ b/TAO/tao/Pluggable_Messaging.cpp @@ -14,9 +14,3 @@ TAO_Pluggable_Messaging::~TAO_Pluggable_Messaging (void) { } - -int -TAO_Pluggable_Messaging::more_messages (void) -{ - ACE_NOTSUP_RETURN (-1); -} diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index b1a9a8c300f..1a8eee1eb28 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -113,20 +113,24 @@ public: virtual void init (CORBA::Octet major, CORBA::Octet minor) = 0; - /// Reset teh messaging object - virtual void reset (int reset_flag = 1) = 0; - // Reset the messaging object - - /// @@ Bala: Documentation + /// Parse the incoming messages.. virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; + /// Calculate the amount of data that is missing in the + /// message block. virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; + /// Get the details of the message parsed through the . virtual void get_message_data (TAO_Queued_Data *qd) = 0; + /* Extract the details of the next message from the + * through . Returns 1 if there are more messages and returns a + * 0 if there are no more messages in . + */ virtual int extract_next_message (ACE_Message_Block &incoming, TAO_Queued_Data *&qd) = 0; + /// Check whether the node needs consolidation from virtual int consolidate_node (TAO_Queued_Data *qd, ACE_Message_Block &incoming) = 0; @@ -153,8 +157,8 @@ public: /// request/response? virtual int is_ready_for_bidirectional (void) = 0; - /// Are there any more messages that needs processing? - virtual int more_messages (void); + /// Reset the messaging the object + virtual void reset (void); }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index ff6d584ceb4..250689c9a37 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -976,13 +976,20 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) { - // Checek whether the last message in the queue is complete.. + // Check whether the last message in the queue is complete.. if (this->incoming_message_queue_.is_tail_complete () == 0) return this->consolidate_message_queue (incoming, missing_data, rh, max_wait_time); + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n", + this->id ())); + } + // Calculate the actual length of the load that we are supposed to // read which is equal to the + length of the buffer // that we have.. @@ -1022,9 +1029,15 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, if (missing_data > 0) { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" + "insufficient read, queueing up the message \n", + this->id ())); + } // Get an instance of TAO_Queued_Data - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data (); + TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data (); // Add the missing data to the queue qd->missing_data_ = missing_data; @@ -1041,6 +1054,38 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, return 0; } + // Check to see if we have messages in queue. AT this point we + // cannot have have semi-complete messages in the queue as they + // would have been taken care before + if (this->incoming_message_queue_.queue_length ()) + { + // If we have messages in the queue, put the in the + // queue + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" + " queueing up the message \n", + this->id ())); + } + + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data (); + + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = incoming.duplicate (); + + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + // Process one on the head of the queue and return + return this->process_queue_head (rh); + } + + // We dont have any missing data. Just make a queued_data node with // the existing message block and send it to the higher layers of // the ORB. @@ -1062,6 +1107,13 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message_queue \n", + this->id ())); + } + // If the queue did not have a complete message put this piece of // message in the queue. We kow it did not have a complete // message. That is why we are here. @@ -1096,6 +1148,9 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, return retval; } + // parse_consolidate_messages () would have processed one of the + // messages, so we better return as we dont want to starve other + // threads. return 0; } @@ -1106,12 +1161,9 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, TAO_Queued_Data *qd = this->incoming_message_queue_.dequeue_tail (); - ACE_Message_Block *mb = - qd->msg_block_; - // Try to do a read again. If we have some luck it would be // great.. - ssize_t n = this->recv (mb->wr_ptr (), + ssize_t n = this->recv (qd->msg_block_->wr_ptr (), missing_data, max_wait_time); @@ -1120,7 +1172,7 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, return n; // Move the write pointer - mb->wr_ptr (n); + qd->msg_block_->wr_ptr (n); // Decrement the missing data qd->missing_data_ -= n; @@ -1139,31 +1191,40 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block &incoming, TAO_Resume_Handle &rh) { - //ACE_DEBUG ((LM_DEBUG, - // "TAO (%P|%t) Consolidating multiple messages.. \n")); - // Take a message from the tail.. + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n", + this->id ())); + } + + // Pick the tail of the queue TAO_Queued_Data *tail = this->incoming_message_queue_.dequeue_tail (); if (tail) { - size_t rd_pos = incoming.rd_ptr () - incoming.base (); - + // If we have a node in the tail, checek to see whether it needs + // consolidation. If so, just consolidate it. if (this->messaging_object ()->consolidate_node (tail, incoming) == -1) return -1; - rd_pos = incoming.rd_ptr () - incoming.base (); - // .. put the tail back in queue.. this->incoming_message_queue_.enqueue_tail (tail); } int retval = 1; - //ACE_DEBUG ((LM_DEBUG, - // "TAO (%P|%t) Extracting multiple messages.. \n")); + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n" + "..............extracting extra messages \n", + this->id ())); + } + // Extract messages.. while (retval == 1) { TAO_Queued_Data *q_data = 0; @@ -1171,11 +1232,11 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block retval = this->messaging_object ()->extract_next_message (incoming, q_data); - if (q_data) this->incoming_message_queue_.enqueue_tail (q_data); } + // In case of error return.. if (retval == -1) { return retval; @@ -1188,9 +1249,7 @@ int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) { // Get the that we have received - TAO_Pluggable_Message_Type t = - qd->msg_type_; - + TAO_Pluggable_Message_Type t = qd->msg_type_; int result = 0; if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) @@ -1223,7 +1282,7 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) } else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) { - // @@Bala: Maybe the input_cdr can be constructed from the + // @@todo: Maybe the input_cdr can be constructed from the // message_block TAO_Pluggable_Reply_Params params (this->orb_core ()); @@ -1289,6 +1348,7 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) if (result == 0) { + this->messaging_object ()->reset (); // The reply dispatcher was no longer registered. @@ -1326,7 +1386,6 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) this->id ())); } - // See if the message in the head of the queue is complete... if (this->incoming_message_queue_.is_head_complete () == 1) { @@ -1334,15 +1393,15 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) TAO_Queued_Data *qd = this->incoming_message_queue_.dequeue_head (); - // Get the event handler.. - ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - // Now that we have pulled out out one message out of the queue, // check whether we have one more message in the queue... if (this->incoming_message_queue_.is_head_complete () == 1) { + // Get the event handler.. + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + // Get the reactor associated with the event handler ACE_Reactor *reactor = eh->reactor (); if (reactor == 0) @@ -1375,15 +1434,10 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) } else { - // We dont have any more messages in the queue. Get the - // handle associated with the Transport and make a - // TAO_Resume_Handle out of it - TAO_Resume_Handle new_rh (this->orb_core_, - eh->get_handle ()); - // As we are ready to process the last message just resume - // the handle - new_rh.resume_handle (); + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + rh.resume_handle (); } // Process the message... diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 69dad7903f8..99c0c4512e9 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -652,7 +652,8 @@ protected: int consolidate_extra_messages (ACE_Message_Block &incoming, TAO_Resume_Handle &rh); - /// @@ Bala: Documentation + /// Process the message by sending it to the higher layers of the + /// ORB. int process_parsed_messages (TAO_Queued_Data *qd); public: @@ -859,7 +860,7 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; - /// @@Bala: Docu?? + /// Queue of the incoming messages.. TAO_Incoming_Message_Queue incoming_message_queue_; /// The queue will start draining no later than -- cgit v1.2.1