diff options
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 32 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 51 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Handler.cpp | 440 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Handler.h | 60 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Handler.inl | 32 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connection_Handler.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 19 | ||||
-rw-r--r-- | TAO/tao/Strategies/SHMIOP_Transport.cpp | 20 | ||||
-rw-r--r-- | TAO/tao/Strategies/UIOP_Transport.cpp | 19 | ||||
-rw-r--r-- | TAO/tao/Wait_On_Read.cpp | 22 |
10 files changed, 362 insertions, 339 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 8113cbdbaa0..dd44eb3a90b 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,35 @@ +Sun May 6 10:02:08 2001 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/GIOP_Message_Base.cpp: + * tao/GIOP_Message_Handler.cpp: + * tao/GIOP_Message_Handler.h: + * tao/GIOP_Message_Handler.inl: Changed the error handling + conditions. When we come across a read failure with EWOULDBLOCK + we used to ask the reactor to call us back. This caused + thrashing when the application received a signal. We dont ask + the reactor to call us back, instead we just return a zero to + the reactor. Further, changed the error codes to match the + reactor semantics. + + * tao/IIOP_Connection_Handler.cpp: + * tao/IIOP_Transport.cpp: Made sure that we pass the error codes + from the GIOP layer, as is to the reactor. We dont manipulate + the return values as we did before. The return values just match + the reactor semantics ie. returns -1 on error, 1 for a quick + callback and a zero for everything is fine case. + + * tao/Wait_On_Read.cpp: The wait () call used to loop on + reply_received flag alone. We now use a temporary variable also + for the looping similar to the way other Wait strategies + loop. The other strategies used a reactor and adhered to the + reactor semantics. The Wait_On_Read did not use a reactor and + followed a different semantics for success and failure. This + created confusion at the protocol implementation level. + + * tao/Strategies/SHMIOP_Transport.cpp: + * tao/Strategies/UIOP_Transport.cpp: Replicated the changes from + IIOP. + Sun May 6 9:49:36 2001 Yamuna Krishnamurthy <yamuna@cs.wustl.edu> * orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp: diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index d4a5dfcbf67..04b587cbc10 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -185,22 +185,38 @@ TAO_GIOP_Message_Base::read_message (TAO_Transport *transport, { // Call the handler to read and do a simple parse of the header of // the message. - int retval = this->message_handler_.read_parse_message (transport); + int retval = + this->message_handler_.read_messages (transport); if (retval < 1) return retval; - // Get the message state - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); + retval = this->message_handler_.parse_message_header (); - // Set the state internally for parsing and generating messages - this->set_state (state.giop_version.major, - state.giop_version.minor); - retval = this->message_handler_.is_message_ready (transport); + // Error in the message that was received + if (retval == -1) + return -1; + // If -2, we want the reactor to call us back, so return 1 + else if (retval == -2) + return 1; + + if (retval != 0) + { + // Get the message state + TAO_GIOP_Message_State &state = + this->message_handler_.message_state (); + + // Set the state internally for parsing and generating messages + this->set_state (state.giop_version.major, + state.giop_version.minor); + } + + // We return 2, it is ugly. But the reactor semantics has made us to + // limp :( + return 2; + - return retval; } int @@ -1147,12 +1163,12 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (void) int TAO_GIOP_Message_Base::more_messages (void) { - // Does the handler have more messages for processing? - int retval = this->message_handler_.more_messages (); + int retval = + this->message_handler_.is_message_ready (); + + if (retval <= 0) + return retval; - if (retval == TAO_MESSAGE_BLOCK_COMPLETE || - retval == TAO_MESSAGE_BLOCK_INCOMPLETE) - return 1; // Get the message state TAO_GIOP_Message_State &state = @@ -1162,12 +1178,5 @@ TAO_GIOP_Message_Base::more_messages (void) this->set_state (state.giop_version.major, state.giop_version.minor); - // retval = this->message_handler_.is_message_ready (); - - if (retval == 1) - { - return TAO_MESSAGE_BLOCK_NEEDS_PROCESSING; - } - return retval; } diff --git a/TAO/tao/GIOP_Message_Handler.cpp b/TAO/tao/GIOP_Message_Handler.cpp index 8681bf8b749..11f79827fdd 100644 --- a/TAO/tao/GIOP_Message_Handler.cpp +++ b/TAO/tao/GIOP_Message_Handler.cpp @@ -42,29 +42,195 @@ TAO_GIOP_Message_Handler::TAO_GIOP_Message_Handler (TAO_ORB_Core * orb_core, } + int -TAO_GIOP_Message_Handler::read_parse_message (TAO_Transport *transport) +TAO_GIOP_Message_Handler::read_messages (TAO_Transport *transport) { - int retval = this->read_messages (transport); + // Read the message from the transport. The size of the message read + // is the maximum size of the buffer that we have less the amount of + // data that has already been read in to the buffer. + ssize_t n = transport->recv (this->current_buffer_.wr_ptr (), + this->current_buffer_.space ()); + + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; - if (retval < 1) - return retval; + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } - // Check what message are we waiting for and take suitable action - if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER) + if (TAO_debug_level == 5) { - if (this->current_buffer_.length () >= - TAO_GIOP_MESSAGE_HEADER_LEN) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_Handler::read_messages" + " received %d bytes\n", + n)); + + size_t len; + for (size_t offset = 0; offset < size_t(n); offset += len) { - return this->parse_header (); + len = n - offset; + if (len > 512) + len = 512; + ACE_HEX_DUMP ((LM_DEBUG, + this->current_buffer_.wr_ptr () + offset, + len, + "TAO (%P|%t) - read_messages ")); } + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n)); } - return retval; + + // Now we have a succesful read. First adjust the write pointer + this->current_buffer_.wr_ptr (n); + + + // Success + return 1; } int -TAO_GIOP_Message_Handler::parse_header (void) +TAO_GIOP_Message_Handler::is_message_ready (void) +{ + if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) + { + size_t len = this->current_buffer_.length (); + char *buf = this->current_buffer_.rd_ptr (); + + // Set the buf pointer to the start of the GIOP header + buf -= TAO_GIOP_MESSAGE_HEADER_LEN; + + // Dump the incoming message . It will be dumped only if the + // debug level is greater than 5 anyway. + this->mesg_base_->dump_msg ( + "Recv msg", + ACE_reinterpret_cast (u_char *, + buf), + len + TAO_GIOP_MESSAGE_HEADER_LEN); + if (len == this->message_state_.message_size) + { + // If the buffer length is equal to the size of the payload we + // have exactly one message. + this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; + + // Check whether we have received only the first part of the + // fragment. + return this->message_state_.is_complete (this->current_buffer_); + } + else if (len > this->message_state_.message_size) + { + // If the length is greater we have received some X messages + // and a part of X + 1 messages (probably) with X varying + // from 1 to N. + this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; + + // Clone the data that we read in. + this->supp_buffer_.data_block ( + this->current_buffer_.data_block ()->clone ()); + + // Set the read and write pointer for the supplementary + // buffer. + size_t rd_pos = this->rd_pos (); + this->supp_buffer_.rd_ptr (rd_pos + + this->message_state_.message_size); + this->supp_buffer_.wr_ptr (this->wr_pos ()); + + // Reset the current buffer + this->current_buffer_.reset (); + + // Set the read and write pointers again for the current + // buffer. We change the write pointer settings as we would + // like to process a single message. + this->current_buffer_.rd_ptr (rd_pos); + this->current_buffer_.wr_ptr (rd_pos + + this->message_state_.message_size); + + return this->message_state_.is_complete (this->current_buffer_); + } + } + else if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES) + { + size_t len = this->supp_buffer_.length (); + + if (len > TAO_GIOP_MESSAGE_HEADER_LEN) + { + // + this->current_buffer_.copy ( + this->supp_buffer_.rd_ptr (), + TAO_GIOP_MESSAGE_HEADER_LEN); + + this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + if (this->parse_message_header_i () == -1) + return -1; + + return this->get_message (); + } + else + { + // We have smaller than the header size left here. We + // just copy the rest of the stuff and reset things so that + // we can read the rest of the stuff from the socket. + this->current_buffer_.copy ( + this->supp_buffer_.rd_ptr (), + len); + + // Reset the supp buffer now + this->supp_buffer_.reset (); + + this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; + } + + } + + // Just send us back to the reactor so that we can for more data to + // come in . + return 0; +} + +ACE_Data_Block * +TAO_GIOP_Message_Handler::steal_data_block (void) +{ + ACE_Data_Block *db = + this->current_buffer_.data_block ()->clone_nocopy (); + + ACE_Data_Block *old_db = + this->current_buffer_.replace_data_block (db); + + ACE_CDR::mb_align (&this->current_buffer_); + + return old_db; +} + + +void +TAO_GIOP_Message_Handler::reset (int reset_flag) +{ + // Reset the contents of the message state + this->message_state_.reset (reset_flag); + + // Reset the current buffer + this->current_buffer_.reset (); + + ACE_CDR::mb_align (&this->current_buffer_); + + if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES) + { + this->supp_buffer_.reset (); + ACE_CDR::mb_align (&this->supp_buffer_); + } + +} + + +int +TAO_GIOP_Message_Handler::parse_message_header_i (void) { if (TAO_debug_level > 8) { @@ -279,152 +445,7 @@ TAO_GIOP_Message_Handler::read_ulong (const char *ptr) return x; } -int -TAO_GIOP_Message_Handler::is_message_ready (TAO_Transport * /*transport*/) -{ - if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) - { - size_t len = this->current_buffer_.length (); - char *buf = this->current_buffer_.rd_ptr (); - // Set the buf pointer to the start of the GIOP header - buf -= TAO_GIOP_MESSAGE_HEADER_LEN; - - if (len == this->message_state_.message_size) - { - // If the buffer length is equal to the size of the payload we - // have exactly one message. - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - - // The message will be dumped only if the debug level is - // greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + TAO_GIOP_MESSAGE_HEADER_LEN); - - // Check whether we have received only the first part of the - // fragment. - return this->message_state_.is_complete (this->current_buffer_); - } - else if (len > this->message_state_.message_size) - { - // If the length is greater we have received some X messages - // and a part of X + 1 messages (probably) with X varying - // from 1 to N. - this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; - - // The message will be dumped only if the debug level is - // greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - this->message_state_.message_size + - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.data_block ( - this->current_buffer_.data_block ()->clone ()); - - // Set the read and write pointer for the supplementary - // buffer. - size_t rd_pos = this->rd_pos (); - this->supp_buffer_.rd_ptr (rd_pos + - this->message_state_.message_size); - this->supp_buffer_.wr_ptr (this->wr_pos ()); - - // Reset the current buffer - this->current_buffer_.reset (); - - - // Set the read and write pointers again for the current buffer - this->current_buffer_.rd_ptr (rd_pos); - this->current_buffer_.wr_ptr (rd_pos + - this->message_state_.message_size); - - return this->message_state_.is_complete (this->current_buffer_); - } - -#if 0 - // @@ This is the ultimate hack for SHMIOP and related protocols - // @@ that uses the reactor for signalling rather than for data - // @@ transfer. This hack was done in the at the last minute for - // @@ the beta 1.1.13. This hack needs to be removed for the - // @@ next beta - Bala - else if (transport->reactor_signalling ()) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - " - "TAO_GIOP_Message_Handler::is_message_ready, " - " disgusting reactor signalling hack!!!!\n")); - } - - if (this->read_messages (transport) == -1) - return -1; - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - " - "TAO_GIOP_Message_Handler::is_message_ready, " - " read_messages called\n")); - } - - // By now we should be having the whole message read in. - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - return this->message_state_.is_complete (this->current_buffer_); - } -#endif /* 0 */ - } - - // Just return allowing the reactor to call us back to get the rest - // of the info - return 0; -} - - -int -TAO_GIOP_Message_Handler::more_messages (void) -{ - if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES) - { - size_t len = this->supp_buffer_.length (); - - if (len > TAO_GIOP_MESSAGE_HEADER_LEN) - { - this->current_buffer_.copy ( - this->supp_buffer_.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - if (this->parse_header () == -1) - return -1; - - return this->get_message (); - } - else - { - // We have smaller than the header size left here. We - // just copy the rest of the stuff and reset things so that - // we can read the rest of the stuff from the socket. - this->current_buffer_.copy ( - this->supp_buffer_.rd_ptr (), - len); - - // Reset the supp buffer now - this->supp_buffer_.reset (); - - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - } - - } - - return TAO_MESSAGE_BLOCK_COMPLETE; -} int @@ -437,6 +458,7 @@ TAO_GIOP_Message_Handler::get_message (void) this->current_buffer_.rd_ptr (); buf -= TAO_GIOP_MESSAGE_HEADER_LEN; + if (len == this->message_state_.message_size) { // If the buffer length is equal to the size of the payload we @@ -449,10 +471,11 @@ TAO_GIOP_Message_Handler::get_message (void) // The message will be dumped only if the debug level is // greater than 5 anyway. this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + TAO_GIOP_MESSAGE_HEADER_LEN); + "Recv msg", + ACE_reinterpret_cast (u_char *, + buf), + len + + TAO_GIOP_MESSAGE_HEADER_LEN); this->supp_buffer_.rd_ptr (this->message_state_.message_size); return this->message_state_.is_complete (this->current_buffer_); @@ -470,11 +493,11 @@ TAO_GIOP_Message_Handler::get_message (void) // The message will be dumped only if the debug level is // greater than 5 anyway. this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - this->message_state_.message_size + - TAO_GIOP_MESSAGE_HEADER_LEN); + "Recv msg", + ACE_reinterpret_cast (u_char *, + buf), + len + + TAO_GIOP_MESSAGE_HEADER_LEN); this->supp_buffer_.rd_ptr (this->message_state_.message_size); return this->message_state_.is_complete (this->current_buffer_); @@ -490,90 +513,5 @@ TAO_GIOP_Message_Handler::get_message (void) } } - return TAO_MESSAGE_BLOCK_INCOMPLETE; -} - -int -TAO_GIOP_Message_Handler::read_messages (TAO_Transport *transport) -{ - // Read the message from the transport. The size of the message read - // is the maximum size of the buffer that we have less the amount of - // data that has already been read in to the buffer. - ssize_t n = transport->recv (this->current_buffer_.wr_ptr (), - this->current_buffer_.space ()); - - if (n == -1) - { - if (errno == EWOULDBLOCK) - return 0; - - return -1; - } - // @@ What are the other error handling here?? - else if (n == 0) - { - return -1; - } - - if (TAO_debug_level == 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_Handler::read_messages" - " received %d bytes\n", - n)); - - size_t len; - for (size_t offset = 0; offset < size_t(n); offset += len) - { - len = n - offset; - if (len > 512) - len = 512; - ACE_HEX_DUMP ((LM_DEBUG, - this->current_buffer_.wr_ptr () + offset, - len, - "TAO (%P|%t) - read_messages ")); - } - ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n)); - } - - // Now we have a succesful read. First adjust the write pointer - this->current_buffer_.wr_ptr (n); - - // Success - return 1; -} - - -ACE_Data_Block * -TAO_GIOP_Message_Handler::steal_data_block (void) -{ - ACE_Data_Block *db = - this->current_buffer_.data_block ()->clone_nocopy (); - - ACE_Data_Block *old_db = - this->current_buffer_.replace_data_block (db); - - ACE_CDR::mb_align (&this->current_buffer_); - - return old_db; -} - - -void -TAO_GIOP_Message_Handler::reset (int reset_flag) -{ - // Reset the contents of the message state - this->message_state_.reset (reset_flag); - - // Reset the current buffer - this->current_buffer_.reset (); - - ACE_CDR::mb_align (&this->current_buffer_); - - if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES) - { - this->supp_buffer_.reset (); - ACE_CDR::mb_align (&this->supp_buffer_); - } - + return 0; } diff --git a/TAO/tao/GIOP_Message_Handler.h b/TAO/tao/GIOP_Message_Handler.h index 9ccae8b2488..a4863362cf5 100644 --- a/TAO/tao/GIOP_Message_Handler.h +++ b/TAO/tao/GIOP_Message_Handler.h @@ -37,20 +37,6 @@ enum TAO_GIOP_Message_Status TAO_GIOP_MULTIPLE_MESSAGES }; -enum TAO_Message_Block_Content_Status -{ - /// The buffer has nomore info for processing ie. all information - /// have been processed - TAO_MESSAGE_BLOCK_COMPLETE = 3, - - /// The buffer has something meaningful and needs processing - TAO_MESSAGE_BLOCK_NEEDS_PROCESSING, - - /// The buffer has nothing meaningful. Need to read more data from - /// the socket to make the reamaining data meaningful - TAO_MESSAGE_BLOCK_INCOMPLETE -}; - /** * @class TAO_GIOP_Message_Handler * @@ -71,17 +57,31 @@ public: TAO_GIOP_Message_Base *base, size_t input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE); - /// Read the message from the transport in to the - /// <current_buffer_>. This method delegates responsibility of - /// parsing to some of the helper methods. - int read_parse_message (TAO_Transport *transport); + + /// Reads the message from the <transport> and sets the <wr_ptr> of + /// the buffer appropriately. + int read_messages (TAO_Transport *transport); + + /// Parse the GIOP message header if we have read bytes suffcient + /// bytes. There are four possibilities + /// - We did not read sufficient bytes, then make the reactor to + /// call us back. (return -2) + /// - We read a piece of message that was left out in the + /// socket. In such cases we just go ahead with more processing + /// (return 0). + /// - We have sufficient info for processing the header and we + /// processed it succesfully. (return 1); + /// - Any errors in processing will return a -1. + int parse_message_header (void); /// Check whether we have atleast one complete message ready for /// processing. - int is_message_ready (TAO_Transport *transport); + int is_message_ready (void); - /// Do we have more messages for processing? - int more_messages (void); + /// Return the underlying data block of the <current_buffer_>. At + /// the sametime making a new data_block for itself. The read and + /// write pointer positions would be reset. + ACE_Data_Block *steal_data_block (void); /// Reset the contents of the <current_buffer_> if no more requests /// need to be processed. We reset the contents of the @@ -94,14 +94,6 @@ public: /// Return the pointer to the data block within the message block ACE_Data_Block *data_block (void) const; - /// Return the underlying data block of the <current_buffer_>. At - /// the sametime making a new data_block for itself. The read and - /// write pointer positions would be reset. - ACE_Data_Block *steal_data_block (void); - - /// Return the rd_ptr of the <current_buffer_> - char *rd_ptr (void) const; - /// Return the position of the read pointer in the <current_buffer_> size_t rd_pos (void) const; @@ -110,8 +102,9 @@ public: private: - /// Parses the header information from the <current_buffer_>. - int parse_header (void); + /// Actually parses the header information from the + /// <current_buffer_>. + int parse_message_header_i (void); /// Validates the first 4 bytes that contain the magic word /// "GIOP". Also calls the validate_version () on the incoming @@ -130,11 +123,6 @@ private: /// <current_buffer_> int get_message (void); - /// Reads the message from the <transport> and sets the <wr_ptr> of - /// the buffer appropriately. - int read_messages (TAO_Transport *transport); - - private: /// The pointer to the object that holds us diff --git a/TAO/tao/GIOP_Message_Handler.inl b/TAO/tao/GIOP_Message_Handler.inl index 89a3b3679e6..3705d91fe27 100644 --- a/TAO/tao/GIOP_Message_Handler.inl +++ b/TAO/tao/GIOP_Message_Handler.inl @@ -1,6 +1,27 @@ // -*- C++ -*- // $Id$ +ACE_INLINE int +TAO_GIOP_Message_Handler::parse_message_header (void) +{ + // Check what message are we waiting for and take suitable action + if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER) + { + if (this->current_buffer_.length () >= + TAO_GIOP_MESSAGE_HEADER_LEN) + { + return this->parse_message_header_i (); + } + + // We dont have suffcient information to decipher the GIOP + // header. Make sure that the reactor calls us back. + return -1; + } + + // The last read just "read" left-over messages + return 0; +} + ACE_INLINE TAO_GIOP_Message_State & TAO_GIOP_Message_Handler::message_state (void) { @@ -13,17 +34,6 @@ TAO_GIOP_Message_Handler::data_block (void) const return this->current_buffer_.data_block (); } - - -ACE_INLINE char * -TAO_GIOP_Message_Handler::rd_ptr (void) const -{ - if (this->supp_buffer_.length () > 0) - return this->supp_buffer_.rd_ptr (); - - return this->current_buffer_.rd_ptr (); -} - ACE_INLINE size_t TAO_GIOP_Message_Handler::rd_pos (void) const { diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 238b3ed1fa9..506d4101a00 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -338,11 +338,9 @@ TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE, if (--this->pending_upcalls_ <= 0) result = -1; - if (result == -1) + if (result == -1 || + result == 1) return result; - else if (result == 0) - // Requires another call to handle_input () - return 1; return 0; } diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index dcc650fca74..a680226fd1e 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -107,17 +107,21 @@ TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, this->tms_->connection_closed (); return -1; } - if (result == 0) + if (result < 2) return result; // Now we know that we have been able to read the complete message // here.. We loop here to see whether we have read more than one // message in our read. - do + + // Set the result state + result = 1; + + // See we use the reactor semantics again + while (result > 0) { result = this->process_message (); } - while (result > 1); return result; } @@ -260,6 +264,13 @@ TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) int TAO_IIOP_Transport::process_message (void) { + // Check whether we have messages for processing + int retval = + this->messaging_object_->more_messages (); + + if (retval <= 0) + return retval; + // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = this->messaging_object_->message_type (); @@ -371,7 +382,7 @@ TAO_IIOP_Transport::process_message (void) return -1; } - return this->messaging_object_->more_messages (); + return 1; } diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index eea9e486280..2a6a0b61186 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -116,17 +116,21 @@ TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, this->tms_->connection_closed (); return -1; } - if (result == 0) + if (result < 2) return result; // Now we know that we have been able to read the complete message // here.. We loop here to see whether we have read more than one // message in our read. - do + + // Set the result state + result = 1; + + // See we use the reactor semantics again + while (result > 0) { result = this->process_message (); } - while (result > 1); return result; } @@ -231,6 +235,13 @@ TAO_SHMIOP_Transport::reactor_signalling (void) int TAO_SHMIOP_Transport::process_message (void) { + // Check whether we have messages for processing + int retval = + this->messaging_object_->more_messages (); + + if (retval <= 0) + return retval; + // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = this->messaging_object_->message_type (); @@ -338,8 +349,7 @@ TAO_SHMIOP_Transport::process_message (void) return -1; } - - return this->messaging_object_->more_messages (); + return 1; } void diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index 97c4f84d145..0049317a677 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -106,17 +106,21 @@ TAO_UIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, this->tms_->connection_closed (); return -1; } - if (result == 0) + if (result < 2) return result; // Now we know that we have been able to read the complete message // here.. We loop here to see whether we have read more than one // message in our read. - do + + // Set the result state + result = 1; + + // See we use the reactor semantics again + while (result > 0) { result = this->process_message (); } - while (result > 1); return result; } @@ -215,6 +219,13 @@ TAO_UIOP_Transport::messaging_init (CORBA::Octet major, int TAO_UIOP_Transport::process_message (void) { + // Check whether we have messages for processing + int retval = + this->messaging_object_->more_messages (); + + if (retval <= 0) + return retval; + // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = this->messaging_object_->message_type (); @@ -322,7 +333,7 @@ TAO_UIOP_Transport::process_message (void) return -1; } - return this->messaging_object_->more_messages (); + return 1; } void diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp index 4e73325944a..3320298e49d 100644 --- a/TAO/tao/Wait_On_Read.cpp +++ b/TAO/tao/Wait_On_Read.cpp @@ -22,16 +22,32 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, int &reply_received) { reply_received = 0; - while (reply_received == 0) + + // Do the same sort of looping that is done in other wait + // strategies. + int retval = 0; + while (1) { - reply_received = + retval = this->transport_->read_process_message (max_wait_time, 1); + + // If we got our reply, no need to run the loop any + // further. + if (reply_received) + break; + + // @@ We are not checking for timeouts here... + + // If we got an error just break + if (retval == -1) + break; } - if (reply_received == -1) + if (reply_received == -1 || retval == -1) { this->transport_->close_connection (); } + return (reply_received == 1 ? 0 : reply_received); } |