diff options
author | bala <balanatarajan@users.noreply.github.com> | 2000-12-26 13:12:41 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2000-12-26 13:12:41 +0000 |
commit | 66de31862ad511040474c5c8238d68bc0c24ff5e (patch) | |
tree | 2b0036b7e8356f084f18ee099a2515f5f290cd8e | |
parent | ce90ad7096626d524bd1567264c8fcffd9879196 (diff) | |
download | ATCD-66de31862ad511040474c5c8238d68bc0c24ff5e.tar.gz |
*** empty log message ***
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 321 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 20 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Generator_Parser_Impl.h | 5 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Handler.cpp | 291 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Handler.h | 120 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Handler.inl | 15 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connection_Handler.cpp | 9 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 12 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.h | 5 | ||||
-rw-r--r-- | TAO/tao/Pluggable.h | 10 | ||||
-rw-r--r-- | TAO/tao/TAO.dsp | 27 | ||||
-rw-r--r-- | TAO/tao/TAO_Static.dsp | 12 | ||||
-rw-r--r-- | TAO/tao/things_that_needs | 2 |
13 files changed, 566 insertions, 283 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 3bce1bc300e..648dc1cb264 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -14,20 +14,15 @@ # include "tao/GIOP_Message_Base.i" #endif /* __ACE_INLINE__ */ -// Constants for GIOP. They are declared static as that will put them -// in file scope. -static const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12; -static const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8; -static const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5; -static const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4; -static const size_t TAO_GIOP_MESSAGE_FLAGS_OFFSET = 6; -static const size_t TAO_GIOP_MESSAGE_TYPE_OFFSET = 7; + ACE_RCSID(tao, GIOP_Message_Base, "$Id$") + + TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core) - : message_state_ (orb_core), + : message_handler_ (orb_core), output_ (0), cdr_buffer_alloc_ ( orb_core->resource_factory ()->output_cdr_buffer_allocator () @@ -85,9 +80,9 @@ void TAO_GIOP_Message_Base::reset (int reset_flag) { // Reset the message state - this->message_state_.reset (reset_flag); + this->message_handler_.message_state ().reset (reset_flag); + this->message_handler_.message_block ()->reset (); - //What else??? } int @@ -192,109 +187,34 @@ TAO_GIOP_Message_Base::read_message (TAO_Transport *transport, int /*block */, ACE_Time_Value *max_wait_time) { - if (this->message_state_.header_received () == 0) - { - int retval = - TAO_GIOP_Utils::read_bytes_input (transport, - message_state_.cdr, - TAO_GIOP_MESSAGE_HEADER_LEN , - max_wait_time); - if (retval == -1) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - \n") - ACE_TEXT ("TAO_GIOP_Message_Base::read_message \n"))); - } - - return -1; - } - - if (this->parse_magic_bytes () == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) -"), - ACE_TEXT ("TAO_GIOP_Message_Base::handle_input, parse_magic_bytes \n"))); - return -1; - } - - // Read the rest of the stuff. That should be read by the - // corresponding states - if (this->parse_header () == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N%l) -\n"), - ACE_TEXT ("TAO_GIOP_Message_Base::handle_input \n"))); - return -1; - } - - if (this->message_state_.cdr.grow (TAO_GIOP_MESSAGE_HEADER_LEN + - this->message_state_.message_size) == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N|%l) - %p\n"), - ACE_TEXT ("ACE_CDR::grow"))); - return -1; - } - - // Growing the buffer may have reset the rd_ptr(), but we want - // to leave it just after the GIOP header (that was parsed - // already); - this->message_state_.cdr.skip_bytes (TAO_GIOP_MESSAGE_HEADER_LEN); - } + // Call the handler to read and do a simple parse of the header of + // the message. + if (this->message_handler_.read_parse_message (transport) == -1) + return -1; - size_t missing_data = - this->message_state_.message_size - this->message_state_.current_offset; + // Get the message state + TAO_GIOP_Message_State &state = + this->message_handler_.message_state (); - ssize_t n = - TAO_GIOP_Utils::read_buffer (transport, - this->message_state_.cdr.rd_ptr () - + this->message_state_.current_offset, - missing_data, - max_wait_time); + // Set the state internally for parsing and generating messages + this->set_state (state.giop_version.major, + state.giop_version.minor); - if (n == -1) + if (TAO_debug_level >= 4) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("TAO_GIOP_Message_Base::handle_input, read_buffer[1] \n"))); - return -1; - } - else if (n == 0) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("TAO_GIOP_Message_Base::handle_input, read_buffer[2]\n"))); - return -1; - } + size_t len = TAO_GIOP_MESSAGE_HEADER_LEN ; - this->message_state_.current_offset += n; + char *buf = this->message_handler_.message_block ()->rd_ptr (); + buf -= len; + size_t msg_len = + this->message_handler_.message_block ()->length () + len; - if (this->message_state_.current_offset == - this->message_state_.message_size) - { - if (TAO_debug_level >= 4) - { - size_t header_len = TAO_GIOP_MESSAGE_HEADER_LEN ; - - // Need to include GIOPlite too. - char *buf = this->message_state_.cdr.rd_ptr (); - buf -= header_len; - size_t msg_len = this->message_state_.cdr.length () + header_len; - this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, - buf), - msg_len); - } + TAO_GIOP_Message_Base::dump_msg ("recv", + ACE_reinterpret_cast (u_char *, + buf), + msg_len); } - - return this->message_state_.is_complete (); + return this->message_handler_.is_message_ready (); } @@ -353,7 +273,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type (void) { - switch (this->message_state_.message_type) + switch (this->message_handler_.message_state ().message_type) { case TAO_GIOP_REQUEST: case TAO_GIOP_LOCATEREQUEST: @@ -395,7 +315,10 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // @@@@Is it necessary here? this->output_->reset (); - // + // Get the Message Block from the handler + ACE_Message_Block *msg_block = + this->message_handler_.message_block (); + // Take out all the information from the <message_state> and reset // it so that nested upcall on the same transport can be handled. // @@ -404,10 +327,14 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // time because the reactor does not call handle_input() for the // same Event_Handler in two threads at the same time. - // Steal the input CDR from the message state. - TAO_InputCDR input_cdr (ACE_InputCDR::Transfer_Contents (this->message_state_.cdr), + // Steal the input CDR from the message block + TAO_InputCDR input_cdr (msg_block->data_block (), + ACE_CDR_BYTE_ORDER, orb_core); + + + // Send the message state for the service layer like FT to log the // messages // @@@ Needed for DOORS @@ -415,11 +342,13 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // Reset the message state. Now, we are ready for the next nested // upcall if any. - this->message_state_.reset (0); + // ###########Wrong???? + msg_block->reset (); + this->message_handler_.message_state ().reset (0); // We know we have some request message. Check whether it is a // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. - switch (this->message_state_.message_type) + switch (this->message_handler_.message_state ().message_type) { case TAO_GIOP_REQUEST: // Should be taken care by the state specific invocations. They @@ -442,16 +371,31 @@ TAO_GIOP_Message_Base::process_reply_message ( TAO_Pluggable_Reply_Params ¶ms ) { + // Get the Message Block from the handler + ACE_Message_Block *msg_block = + this->message_handler_.message_block (); + + // Steal the input CDR from the message block + TAO_InputCDR input_cdr (msg_block->data_block (), + ACE_CDR_BYTE_ORDER, + orb_core); + + // Reset the message state. Now, we are ready for the next nested + // upcall if any. + // ###########Wrong???? + msg_block->reset (); + this->message_handler_.message_state ().reset (0); + // We know we have some reply message. Check whether it is a // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. - switch (this->message_state_.message_type) + switch (this->message_handler_.message_state ().message_type) { case TAO_GIOP_REPLY: // Should be taken care by the state specific parsing - return this->generator_parser_->parse_reply (this->message_state_.cdr, + return this->generator_parser_->parse_reply (input_cdr, params); case TAO_GIOP_LOCATEREPLY: - return this->generator_parser_->parse_locate_reply (this->message_state_.cdr, + return this->generator_parser_->parse_locate_reply (input_cdr, params); default: return -1; @@ -939,151 +883,6 @@ TAO_GIOP_Message_Base::send_error (TAO_Transport *transport) } -int -TAO_GIOP_Message_Base::parse_header (void) -{ - char *buf = this->message_state_.cdr.rd_ptr (); - - // Let us be specific that it is for 1.0 - if (this->message_state_.giop_version.minor == 0 && - this->message_state_.giop_version.minor == 1) - { - this->message_state_.byte_order = - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; - if (TAO_debug_level > 2 - && this->message_state_.byte_order != 0 && - this->message_state_.byte_order != 1) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") - ACE_TEXT (" for version <1.0>\n"), - this->message_state_.byte_order)); - return -1; - } - } - else - { - // Read teh byte ORDER - this->message_state_.byte_order = - (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); - - // Read the fragment bit - this->message_state_.more_fragments = - (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); - - if (TAO_debug_level > 2 - && (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") - ACE_TEXT (" for version <%d %d> \n"), - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], - this->message_state_.giop_version.major, - this->message_state_.giop_version.minor)); - return -1; - } - } - - // Get the message type - this->message_state_.message_type = - buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - - // Reset our input CDR stream - this->message_state_.cdr.reset_byte_order (this->message_state_.byte_order); - - - this->message_state_.cdr.skip_bytes (TAO_GIOP_MESSAGE_SIZE_OFFSET); - this->message_state_.cdr.read_ulong (this->message_state_.message_size); - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"), - this->message_state_.giop_version.major, - this->message_state_.giop_version.minor, - this->message_state_.byte_order, - this->message_state_.message_type, - this->message_state_.message_size)); - } - - return 1; -} - - -int -TAO_GIOP_Message_Base::parse_magic_bytes (void) -{ - // Grab the read pointer - char *buf = this->message_state_.cdr.rd_ptr (); - - // The values are hard-coded to support non-ASCII platforms. - if (!(buf [0] == 0x47 // 'G' - && buf [1] == 0x49 // 'I' - && buf [2] == 0x4f // 'O' - && buf [3] == 0x50)) // 'P' - { - // For the present... - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) bad header, magic word [%c%c%c%c]\n"), - buf[0], - buf[1], - buf[2], - buf[3])); - return -1; - } - - if (this->validate_version () == -1) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%N|%l|%P|%t) Error in validating") - ACE_TEXT ("revision \n"))); - return -1; - } - } - - return 0; -} - - -int -TAO_GIOP_Message_Base::validate_version (void) -{ - // Grab the read pointer - char *buf = this->message_state_.cdr.rd_ptr (); - - CORBA::Octet incoming_major = - buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; - CORBA::Octet incoming_minor = - buf[TAO_GIOP_VERSION_MINOR_OFFSET]; - - if (this->tao_giop_impl_.check_revision (incoming_major, - incoming_minor) == 0) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t|%N|%l) bad version <%d.%d>\n"), - incoming_major, incoming_minor)); - } - - return -1; - } - - // Sets the version - this->message_state_.giop_version.minor = incoming_minor; - this->message_state_.giop_version.major = incoming_major; - - // Sets the state - this->set_state (incoming_major, - incoming_minor); - - return 0; -} - - void TAO_GIOP_Message_Base::set_state (CORBA::Octet def_major, CORBA::Octet def_minor) diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 53a6ca5180c..81879a2fbbd 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -22,7 +22,7 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "tao/GIOP_Message_Generator_Parser_Impl.h" -#include "tao/GIOP_Message_State.h" +#include "tao/GIOP_Message_Handler.h" #include "tao/GIOP_Utils.h" class TAO_Pluggable_Reply_Params; @@ -145,18 +145,6 @@ private: /// Send error messages int send_error (TAO_Transport *transport); - /// Parses the header of the GIOP messages for validity - int parse_header (void); - - /// Validates the first 4 bytes that contain the magic word - /// "GIOP". Also calls the validate_version () on the incoming - /// stream. - int parse_magic_bytes (void); - - /// This will do a validation of the stream that arrive in the - /// transport. - int validate_version (void); - /// Set the state void set_state (CORBA::Octet major, CORBA::Octet minor); @@ -190,9 +178,9 @@ private: private: - /// The message state. It represents the status of the messages that - /// have been read from the connection. - TAO_GIOP_Message_State message_state_; + /// Thr message handler object that does reading and parsing of the + /// incoming messages + TAO_GIOP_Message_Handler message_handler_; /// Output CDR TAO_OutputCDR *output_; diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.h b/TAO/tao/GIOP_Message_Generator_Parser_Impl.h index 38560d05864..ab222fd640b 100644 --- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.h +++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.h @@ -37,9 +37,10 @@ class TAO_Export TAO_GIOP_Message_Generator_Parser_Impl { public: + /// Performs a check of the revision numbers - CORBA::Boolean check_revision (CORBA::Octet incoming_major, - CORBA::Octet incoming_minor); + static CORBA::Boolean check_revision (CORBA::Octet incoming_major, + CORBA::Octet incoming_minor); /// Version 1.0 of GIOP TAO_GIOP_Message_Generator_Parser_10 tao_giop_10; diff --git a/TAO/tao/GIOP_Message_Handler.cpp b/TAO/tao/GIOP_Message_Handler.cpp new file mode 100644 index 00000000000..20a6ff9b322 --- /dev/null +++ b/TAO/tao/GIOP_Message_Handler.cpp @@ -0,0 +1,291 @@ +#include "tao/GIOP_Message_Handler.h" +#include "tao/Pluggable.h" +#include "tao/debug.h" +#include "tao/GIOP_Message_Generator_Parser_Impl.h" + +#if !defined (__ACE_INLINE__) +# include "tao/GIOP_Message_Handler.inl" +#endif /* __ACE_INLINE__ */ + + + + + +ACE_RCSID(tao, GIOP_Message_Handler, "$Id$") + + + +TAO_GIOP_Message_Handler:: + TAO_GIOP_Message_Handler (TAO_ORB_Core * orb_core) + : message_status_ (TAO_GIOP_WAITING_FOR_HEADER), + message_size_ (ACE_CDR::DEFAULT_BUFSIZE), + current_buffer_ (message_size_), + message_state_ (orb_core) +{ +} + + +int +TAO_GIOP_Message_Handler::read_parse_message (TAO_Transport *transport) +{ + // Read the message from the transport + ssize_t n = transport->read (this->current_buffer_.wr_ptr (), + this->message_size_); + + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + else if (errno == ECONNRESET) + { + // @@ Is this OK?? + + // We got a connection reset (TCP RSET) from the other side, + // i.e., they didn't initiate a proper shutdown. + // + // Make it look like things are OK to the upper layer. + errno = 0; + return 0; + } + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } + + // Now we have a succesful read. First adjust the read pointer + this->current_buffer_.wr_ptr (n); + + // Check what message are we waiting for and take suitable action + if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER) + { + if (this->current_buffer_.length () >= + TAO_GIOP_MESSAGE_HEADER_LEN) + { + if (this->parse_header () == -1) + return -1; + } + } + + /*if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) + { + // If the length of the buffer is greater than the size of the + // message that we received then process that message. If not + // just return allowing the reactor to call us back. + if (this->current_buffer_.length () < (this->message_status_.message_size + + TAO_GIOP_MESSAGE_HEADER_LEN)) + return 0; + else + { + // We have payloads that we need to process + if (this->parse_payload () == -1) + return -1; + } + }*/ + + return 0; +} + +int +TAO_GIOP_Message_Handler::parse_header (void) +{ + // Check whether we have a GIOP Message in the first place + if (this->parse_magic_bytes () == -1) + return -1; + + // Grab the read pointer + char *buf = this->current_buffer_.rd_ptr (); + + // Let us be specific that it is for 1.0 + if (this->message_state_.giop_version.minor == 0 && + this->message_state_.giop_version.minor == 1) + { + this->message_state_.byte_order = + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; + if (TAO_debug_level > 2 + && this->message_state_.byte_order != 0 && + this->message_state_.byte_order != 1) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") + ACE_TEXT (" for version <1.0>\n"), + this->message_state_.byte_order)); + return -1; + } + } + else + { + // Read the byte ORDER + this->message_state_.byte_order = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); + + // Read the fragment bit + this->message_state_.more_fragments = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); + + if (TAO_debug_level > 2 + && (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + this->message_state_.giop_version.major, + this->message_state_.giop_version.minor)); + return -1; + } + } + + // Get the message type + this->message_state_.message_type = + buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; + + + + + // Get the payload size. If the payload size is greater than the + // length then set the length of the message block to that size + this->message_state_.message_size = this->get_payload_size (); + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"), + this->message_state_.giop_version.major, + this->message_state_.giop_version.minor, + this->message_state_.byte_order, + this->message_state_.message_type, + this->message_state_.message_size)); + } + + // The GIOP header has been parsed. Set the status to wait for payload + this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; + + return 0; +} + + +int +TAO_GIOP_Message_Handler::parse_magic_bytes (void) +{ + // Grab the read pointer + char *buf = this->current_buffer_.rd_ptr (); + + // The values are hard-coded to support non-ASCII platforms. + if (!(buf [0] == 0x47 // 'G' + && buf [1] == 0x49 // 'I' + && buf [2] == 0x4f // 'O' + && buf [3] == 0x50)) // 'P' + { + // For the present... + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) bad header, magic word [%c%c%c%c]\n"), + buf[0], + buf[1], + buf[2], + buf[3])); + return -1; + } + + // We have a GIOP message on hand. Get its revision numbers + CORBA::Octet incoming_major = + buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = + buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + + if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( + incoming_major, + incoming_minor) == 0) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N|%l) bad version <%d.%d>\n"), + incoming_major, incoming_minor)); + } + + return -1; + } + + // Set the version + this->message_state_.giop_version.minor = incoming_minor; + this->message_state_.giop_version.major = incoming_major; + + return 0; +} + + +CORBA::ULong +TAO_GIOP_Message_Handler::get_payload_size (void) +{ + // Set the read pointer in <current_buffer_> to point to the size of + // the payload + this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_SIZE_OFFSET); + + // No. of bytes occupied by the message size in the header. + size_t msg_size = 4; + + char *buf = ACE_ptr_align_binary (this->current_buffer_.rd_ptr (), + msg_size); + + CORBA::ULong x; +#if !defined (ACE_DISABLE_SWAP_ON_READ) + if (!(this->message_state_.byte_order != ACE_CDR_BYTE_ORDER)) + { + x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); + } + else + { + ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x)); + } +#else + x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf); +#endif /* ACE_DISABLE_SWAP_ON_READ */ + + if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_) + { + // Increase the size of the <current_buffer_> + this->current_buffer_.size (x + TAO_GIOP_MESSAGE_HEADER_LEN); + } + + // Set the read pointer to the end of the GIOP header + this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - + TAO_GIOP_MESSAGE_SIZE_OFFSET); + return x; +} + +int +TAO_GIOP_Message_Handler::is_message_ready (void) +{ + if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) + { + // If the length of the buffer is greater than the size of the + // message that we received then process that message. If not + // just return allowing the reactor to call us back. + if (this->current_buffer_.length () < (this->message_state_.message_size + + TAO_GIOP_MESSAGE_HEADER_LEN)) + return 0; + } + + this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; + + // We have atleast one message for processing + return 1; +} +/*int +TAO_GIOP_Message_Handler::parse_payload (void) +{ + if (this->current_buffer_.length () == + (this->message_state_.message_size + TAO_GIOP_MESSAGE_HEADER_LEN)) + { + // We have exactly one message in the buffer + + // Reset our input CDR stream + this->message_state_.cdr.reset_byte_order + (this->message_state_.byte_order); + } +} +*/ diff --git a/TAO/tao/GIOP_Message_Handler.h b/TAO/tao/GIOP_Message_Handler.h new file mode 100644 index 00000000000..4afb85e53a8 --- /dev/null +++ b/TAO/tao/GIOP_Message_Handler.h @@ -0,0 +1,120 @@ +// This may look like C, but it's really -*- C++ -*- +// -*- C++ -*- +// =================================================================== +/** + * @file GIOP_Message_Handler.h + * + * $Id$ + * + * @author Balachandran Natarajan <bala@cs.wustl.edu> + **/ +// =================================================================== + +#ifndef TAO_GIOP_MESSAGE_HANDLER_H +#define TAO_GIOP_MESSAGE_HANDLER_H +#include "ace/pre.h" +#include "ace/Message_Block.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/GIOP_Message_State.h" + +class TAO_Transport; +class TAO_ORB_Core *orb_core; + +enum TAO_GIOP_Message_Status +{ + /// The buffer is waiting for the header of the message yet + TAO_GIOP_WAITING_FOR_HEADER = 0, + + /// The buffer is waiting for the payload to appear on the socket + TAO_GIOP_WAITING_FOR_PAYLOAD +}; + +/** + * @class TAO_GIOP_Message_Handler + * + * @brief GIOP specific message handler class + * + * This class does some of the message handling for GIOP. This class + * reads the message from the socket, splits the messages to create a + * CDR stream out of it and passes that to the higher layers of the ORB. + * The read from the socket is done using a single 'read' instead of + * reading the header and the payload seperately. + */ + +class TAO_GIOP_Message_Handler +{ +public: + + /// Ctor + TAO_GIOP_Message_Handler (TAO_ORB_Core *orb_core); + + /// Read the message from the transport in to the + /// <current_buffer_>. This method delegates responsibility of + /// parsing to some of the helper methods. + int read_parse_message (TAO_Transport *transport); + + /// Check whether we have atleast one complete message ready for + /// processing. + int is_message_ready (void); + + /// Return the underlying message state + TAO_GIOP_Message_State &message_state (void); + + /// Return the pointer to the underlying Message Block + ACE_Message_Block *message_block (void); + +private: + + /// Parses the header information from the <current_buffer_>. + int parse_header (void); + + /// Validates the first 4 bytes that contain the magic word + /// "GIOP". Also calls the validate_version () on the incoming + /// stream. + int parse_magic_bytes (void); + + /// Gets the size of the payload from the <current_buffer_>. If the + /// size of the current buffer is less than the payload size, the + /// size of the buffer is increased. + CORBA::ULong get_payload_size (void); + +private: + + /// The state of the message in the buffer + TAO_GIOP_Message_Status message_status_; + + /// The size of the message that is being read of the socket. This + /// value is originally set to 1024 bytes. It is reset if we start + /// receiving messages with payloads greater than that. The current + /// value of <message_size_> would be the size of the last message + /// received (ie. payload+headers). + size_t message_size_; + + /// The buffer. rd_ptr() points to the beginning of the current + /// message, properly aligned wr_ptr() points to where the next + /// read() should put the data. + ACE_Message_Block current_buffer_; + + /// The message state. It represents the status of the messages that + /// have been read from the current_buffer_ + TAO_GIOP_Message_State message_state_; +}; + + +const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12; +const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8; +const size_t TAO_GIOP_MESSAGE_FLAGS_OFFSET = 6; +const size_t TAO_GIOP_MESSAGE_TYPE_OFFSET = 7; +const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5; +const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4; + +#if defined (__ACE_INLINE__) +# include "tao/GIOP_Message_Handler.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /*TAO_GIOP_MESSAGE_HANDLER_H*/ diff --git a/TAO/tao/GIOP_Message_Handler.inl b/TAO/tao/GIOP_Message_Handler.inl new file mode 100644 index 00000000000..1a692bc6fe3 --- /dev/null +++ b/TAO/tao/GIOP_Message_Handler.inl @@ -0,0 +1,15 @@ +// -*- C++ -*- +// $Id$ + +ACE_INLINE TAO_GIOP_Message_State & +TAO_GIOP_Message_Handler::message_state (void) +{ + return this->message_state_; +} + +ACE_INLINE ACE_Message_Block * +TAO_GIOP_Message_Handler::message_block (void) +{ + // The read pointer should be after the GIOP header + return &this->current_buffer_; +} diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 5c4d950c15f..4c32b6ffe0c 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -326,10 +326,11 @@ TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE, if (this->refcount_ == 0) this->decr_ref_count (); - if (result == 0 || result == -1) - { - return result; - } + if (result == -1) + return result; + else if (result == 0) + // Requires another call to handle_input () + return 1; return 0; } diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index e1b4a09b042..6453937dd6a 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -146,6 +146,18 @@ TAO_IIOP_Transport::recv (char *buf, } +ssize_t +TAO_IIOP_Transport::read (char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) +{ + return ACE::recv (this->handle (), + (void *) buf, + len); +} + + + int TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, int block) diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index c6c5aa96b74..31b438e4feb 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -96,6 +96,11 @@ public: size_t len, const ACE_Time_Value *s = 0); + /// Read len bytes from the socket into buf + virtual ssize_t read (char *buf, + size_t len, + const ACE_Time_Value *s = 0); + /// Read and process the message from the connection. The processing /// of the message is done by delegating the work to the underlying /// messaging object diff --git a/TAO/tao/Pluggable.h b/TAO/tao/Pluggable.h index fe9afcee201..dbb60b99db3 100644 --- a/TAO/tao/Pluggable.h +++ b/TAO/tao/Pluggable.h @@ -123,6 +123,16 @@ public: size_t len, const ACE_Time_Value *s = 0) = 0; + /** + * Try to read len bytes from into buf. + * @@ The ACE_Time_Value *s is just a place holder for now. It is + * not clear this this is the best place to specify this. The actual + * timeout values will be kept in the Policies. + */ + virtual ssize_t read (char *buf, + size_t len, + const ACE_Time_Value *s = 0) = 0; + /// Fill into <output> the right headers to make a request. virtual void start_request (TAO_ORB_Core *orb_core, diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index 3fb49920812..9450a5b9cb2 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -1316,6 +1316,25 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.cpp # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Handler.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Lite.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -3825,6 +3844,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.h # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Handler.h
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Lite.h
# End Source File
# Begin Source File
@@ -4553,6 +4576,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.inl # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Handler.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Lite.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp index 9f4ba5b334e..9defbc20642 100644 --- a/TAO/tao/TAO_Static.dsp +++ b/TAO/tao/TAO_Static.dsp @@ -323,6 +323,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.h # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Handler.h
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Lite.h
# End Source File
# Begin Source File
@@ -1051,6 +1055,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.inl # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Handler.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Lite.i
# End Source File
# Begin Source File
@@ -1687,6 +1695,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.cpp # End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Handler.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Lite.cpp
# End Source File
# Begin Source File
diff --git a/TAO/tao/things_that_needs b/TAO/tao/things_that_needs new file mode 100644 index 00000000000..9e8267e520a --- /dev/null +++ b/TAO/tao/things_that_needs @@ -0,0 +1,2 @@ +- Fragmentation +- Mutliple message reads |