diff options
Diffstat (limited to 'TAO/tao/GIOP_Message_Reactive_Handler.cpp')
-rw-r--r-- | TAO/tao/GIOP_Message_Reactive_Handler.cpp | 574 |
1 files changed, 574 insertions, 0 deletions
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.cpp b/TAO/tao/GIOP_Message_Reactive_Handler.cpp new file mode 100644 index 00000000000..a8af560081a --- /dev/null +++ b/TAO/tao/GIOP_Message_Reactive_Handler.cpp @@ -0,0 +1,574 @@ +// $Id$ + +#include "tao/GIOP_Message_Reactive_Handler.h" +#include "tao/GIOP_Message_Generator_Parser_Impl.h" +#include "tao/ORB_Core.h" +#include "tao/Pluggable.h" +#include "tao/debug.h" +#include "tao/GIOP_Message_Base.h" +#include "Transport.h" + +#if !defined (__ACE_INLINE__) +# include "tao/GIOP_Message_Reactive_Handler.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$") + +TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (TAO_ORB_Core * orb_core, + TAO_GIOP_Message_Base *base, + size_t input_cdr_size) + : message_state_ (orb_core), + mesg_base_ (base), + message_status_ (TAO_GIOP_WAITING_FOR_HEADER), + message_size_ (input_cdr_size), + current_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)), + supp_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)) + +{ + // NOTE: The message blocks here use a locked allocator which is not + // from the TSS even if there is one. We are getting the allocators + // from the global memory. We shouldn't be using the TSS stuff for + // the following reason + // (a) The connection handlers are per-connection and not + // per-thread. + // (b) The order of cleaning is important if we use allocators from + // TSS. The TSS goes away when the threads go away. But the + // connection handlers go away only when the ORB decides to shut + // it down. + ACE_CDR::mb_align (&this->current_buffer_); + + // Calculate the effective message after alignment + this->message_size_ -= this->rd_pos (); +} + + + +int +TAO_GIOP_Message_Reactive_Handler::read_messages (TAO_Transport *transport) +{ + // Read the message from the transport. The size of the message read + // is the maximum size of the buffer that we have less the amount of + // data that has already been read in to the buffer. + ssize_t n = transport->recv (this->current_buffer_.wr_ptr (), + this->current_buffer_.space ()); + + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } + + if (TAO_debug_level == 5) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_Reactive_Handler::read_messages" + " received %d bytes\n", + n)); + + size_t len; + for (size_t offset = 0; offset < size_t(n); offset += len) + { + len = n - offset; + if (len > 512) + len = 512; + ACE_HEX_DUMP ((LM_DEBUG, + this->current_buffer_.wr_ptr () + offset, + len, + "TAO (%P|%t) - read_messages ")); + } + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n)); + } + + + // Now we have a succesful read. First adjust the write pointer + this->current_buffer_.wr_ptr (n); + + + // Success + return 1; +} + +int +TAO_GIOP_Message_Reactive_Handler::parse_message_header (void) +{ + // Check what message are we waiting for and take suitable action + if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER) + { + size_t len = this->current_buffer_.length (); + char *buf = this->current_buffer_.rd_ptr (); + + if (len > TAO_GIOP_MESSAGE_HEADER_LEN) + { + // Parse the GIOP header + if (this->parse_message_header_i (buf) == -1) + + return -1; + + int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN, + len); + + // Set the pointer read pointer position in the + // <current_buffer_> + size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN; + + if (retval) + { + // We had a fragment header, so the position should be + // beyond that + pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + } + + this->current_buffer_.rd_ptr (pos); + buf = this->current_buffer_.rd_ptr (); + + // The GIOP header has been parsed. Set the status to wait for + // payload + this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; + + return 1; + } + + // We dont have sufficient information to decipher the GIOP + // header. Make sure that the reactor calls us back. + return -2; + } + + // The last read just "read" left-over messages + return 0; +} + +int +TAO_GIOP_Message_Reactive_Handler::is_message_ready (void) +{ + if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) + { + size_t len = this->current_buffer_.length (); + char *buf = this->current_buffer_.rd_ptr (); + + // Set the buf pointer to the start of the GIOP header + buf -= TAO_GIOP_MESSAGE_HEADER_LEN; + + // Dump the incoming message . It will be dumped only if the + // debug level is greater than 5 anyway. + this->mesg_base_->dump_msg ( + "Recv msg", + ACE_reinterpret_cast (u_char *, + buf), + len + TAO_GIOP_MESSAGE_HEADER_LEN); + if (len == this->message_state_.message_size) + { + // If the buffer length is equal to the size of the payload we + // have exactly one message. + this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; + + // Check whether we have received only the first part of the + // fragment. + return this->message_state_.is_complete (this->current_buffer_); + } + else if (len > this->message_state_.message_size) + { + // If the length is greater we have received some X messages + // and a part of X + 1 messages (probably) with X varying + // from 1 to N. + this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; + + // Clone the data that we read in. + this->supp_buffer_.data_block ( + this->current_buffer_.data_block ()->clone ()); + + // Set the read and write pointer for the supplementary + // buffer. + size_t rd_pos = this->rd_pos (); + this->supp_buffer_.rd_ptr (rd_pos + + this->message_state_.message_size); + this->supp_buffer_.wr_ptr (this->wr_pos ()); + + // Reset the current buffer + this->current_buffer_.reset (); + + // Set the read and write pointers again for the current + // buffer. We change the write pointer settings as we would + // like to process a single message. + this->current_buffer_.rd_ptr (rd_pos); + this->current_buffer_.wr_ptr (rd_pos + + this->message_state_.message_size); + + return this->message_state_.is_complete (this->current_buffer_); + } + } + else if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES) + { + size_t len = this->supp_buffer_.length (); + + if (len > TAO_GIOP_MESSAGE_HEADER_LEN) + { + // @@ What about fragment headers??? + this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), + TAO_GIOP_MESSAGE_HEADER_LEN); + + this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + len = this->current_buffer_.length (); + char *buf = this->current_buffer_.rd_ptr (); + + if (this->parse_message_header_i (buf) == -1) + + return -1; + + // Set the pointer read pointer position in the + // <current_buffer_> + this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + // The GIOP header has been parsed. Set the status to wait for + // payload + this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; + + return this->get_message (); + } + else + { + // We have smaller than the header size left here. We + // just copy the rest of the stuff and reset things so that + // we can read the rest of the stuff from the socket. + this->current_buffer_.copy ( + this->supp_buffer_.rd_ptr (), + len); + + // Reset the supp buffer now + this->supp_buffer_.reset (); + + this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; + } + + } + + // Just send us back to the reactor so that we can for more data to + // come in . + return 0; +} + +ACE_Data_Block * +TAO_GIOP_Message_Reactive_Handler::steal_data_block (void) +{ + ACE_Data_Block *db = + this->current_buffer_.data_block ()->clone_nocopy (); + + ACE_Data_Block *old_db = + this->current_buffer_.replace_data_block (db); + + ACE_CDR::mb_align (&this->current_buffer_); + + return old_db; +} + + +void +TAO_GIOP_Message_Reactive_Handler::reset (int reset_flag) +{ + // Reset the contents of the message state + this->message_state_.reset (reset_flag); + + // Reset the current buffer + this->current_buffer_.reset (); + + ACE_CDR::mb_align (&this->current_buffer_); + + if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES) + { + this->supp_buffer_.reset (); + ACE_CDR::mb_align (&this->supp_buffer_); + } + +} + + +int +TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (char *buf) +{ + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n")); + } + + // Check whether we have a GIOP Message in the first place + if (this->parse_magic_bytes (buf) == -1) + return -1; + + // Let us be specific that this is for 1.0 + if (this->message_state_.giop_version.minor == 0 && + this->message_state_.giop_version.major == 1) + { + this->message_state_.byte_order = + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; + + if (this->message_state_.byte_order != 0 && + this->message_state_.byte_order != 1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") + ACE_TEXT (" for version <1.0>\n"), + this->message_state_.byte_order)); + return -1; + } + } + else + { + // Read the byte ORDER + this->message_state_.byte_order = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); + + // Read the fragment bit + this->message_state_.more_fragments = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); + + if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + this->message_state_.giop_version.major, + this->message_state_.giop_version.minor)); + return -1; + } + } + + // Get the message type + this->message_state_.message_type = + buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; + + // Get the payload size. If the payload size is greater than the + // length then set the length of the message block to that + // size. Move the rd_ptr to the end of the GIOP header + this->message_state_.message_size = this->get_payload_size (buf); + + // If the message_size or the payload_size is zero then something + // is fishy. So return an error. + if (this->message_state_.message_size == 0) + return -1; + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"), + this->message_state_.giop_version.major, + this->message_state_.giop_version.minor, + this->message_state_.byte_order, + this->message_state_.message_type, + this->message_state_.message_size)); + } + + return 1; +} + + +int +TAO_GIOP_Message_Reactive_Handler::parse_fragment_header (char *buf, + size_t length) +{ + size_t len = + TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN; + + // By this point we are doubly sure that we have a more or less + // valid GIOP message with a valid major revision number. + if (this->message_state_.more_fragments && + this->message_state_.giop_version.minor == 2 && + length > len) + { + // Fragmented message in GIOP 1.2 should have a fragment header + // following the GIOP header. Grab the rd_ptr to get that + // info. + this->message_state_.request_id = this->read_ulong (buf); + + // As we parsed the header + return 1; + } + + return 0; +} + + +int +TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf) +{ + // The values are hard-coded to support non-ASCII platforms. + if (!(buf [0] == 0x47 // 'G' + && buf [1] == 0x49 // 'I' + && buf [2] == 0x4f // 'O' + && buf [3] == 0x50)) // 'P' + { + // For the present... + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) bad header, " + "magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"), + buf[0], + buf[1], + buf[2], + buf[3])); + return -1; + } + + // We have a GIOP message on hand. Get its revision numbers + CORBA::Octet incoming_major = + buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = + buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + + if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( + incoming_major, + incoming_minor) == 0) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N|%l) bad version <%d.%d>\n"), + incoming_major, incoming_minor)); + } + + return -1; + } + + // Set the version + this->message_state_.giop_version.minor = incoming_minor; + this->message_state_.giop_version.major = incoming_major; + + return 0; +} + + +CORBA::ULong +TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr) +{ + // Move the read pointer + rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; + + CORBA::ULong x = this->read_ulong (rd_ptr); + + if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_) + { + if (ACE_CDR::grow (&this->current_buffer_, + x + TAO_GIOP_MESSAGE_HEADER_LEN) == -1) + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P | %t) Unable to increase the size \n") + ACE_TEXT ("of the buffer \n"))); + return 0; + } + + // New message size is the size of the now larger buffer. + this->message_size_ = x + + TAO_GIOP_MESSAGE_HEADER_LEN + + ACE_CDR::MAX_ALIGNMENT; + } + + // Set the read pointer to the end of the GIOP message + // this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + return x; +} + +CORBA::ULong +TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr) +{ + size_t msg_size = 4; + + char *buf = ACE_ptr_align_binary (ptr, + msg_size); + + CORBA::ULong x; +#if !defined (ACE_DISABLE_SWAP_ON_READ) + if (!(this->message_state_.byte_order != ACE_CDR_BYTE_ORDER)) + { + x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); + } + else + { + ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x)); + } +#else + x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf); +#endif /* ACE_DISABLE_SWAP_ON_READ */ + + return x; +} + + + + +int +TAO_GIOP_Message_Reactive_Handler::get_message (void) +{ + if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) + { + size_t len = this->supp_buffer_.length (); + char * buf = + this->current_buffer_.rd_ptr (); + + buf -= TAO_GIOP_MESSAGE_HEADER_LEN; + + if (len == this->message_state_.message_size) + { + // If the buffer length is equal to the size of the payload we + // have exactly one message. Check whether we have received + // only the first part of the fragment. + this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; + this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), + this->message_state_.message_size); + + // The message will be dumped only if the debug level is + // greater than 5 anyway. + this->mesg_base_->dump_msg ( + "Recv msg", + ACE_reinterpret_cast (u_char *, + buf), + len + + TAO_GIOP_MESSAGE_HEADER_LEN); + + this->supp_buffer_.rd_ptr (this->message_state_.message_size); + return this->message_state_.is_complete (this->current_buffer_); + } + else if (len > this->message_state_.message_size) + { + // If the length is greater we have received some X messages + // and a part of X + 1 messages (probably) with X varying + // from 1 to N. + this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; + + this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), + this->message_state_.message_size); + + // The message will be dumped only if the debug level is + // greater than 5 anyway. + this->mesg_base_->dump_msg ( + "Recv msg", + ACE_reinterpret_cast (u_char *, + buf), + len + + TAO_GIOP_MESSAGE_HEADER_LEN); + + this->supp_buffer_.rd_ptr (this->message_state_.message_size); + return this->message_state_.is_complete (this->current_buffer_); + } + else + { + // The remaining message in the supp buffer + this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), + this->supp_buffer_.length ()); + + // Reset the supp buffer now + this->supp_buffer_.reset (); + } + } + + return 0; +} |