summaryrefslogtreecommitdiff
path: root/TAO/tao/GIOP_Message_Reactive_Handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/GIOP_Message_Reactive_Handler.cpp')
-rw-r--r--TAO/tao/GIOP_Message_Reactive_Handler.cpp574
1 files changed, 122 insertions, 452 deletions
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.cpp b/TAO/tao/GIOP_Message_Reactive_Handler.cpp
index a8af560081a..da22b2e93ab 100644
--- a/TAO/tao/GIOP_Message_Reactive_Handler.cpp
+++ b/TAO/tao/GIOP_Message_Reactive_Handler.cpp
@@ -8,393 +8,95 @@
#include "tao/GIOP_Message_Base.h"
#include "Transport.h"
+#if 0
#if !defined (__ACE_INLINE__)
# include "tao/GIOP_Message_Reactive_Handler.inl"
#endif /* __ACE_INLINE__ */
-ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$")
+#endif /*if 0*/
-TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (TAO_ORB_Core * orb_core,
- TAO_GIOP_Message_Base *base,
- size_t input_cdr_size)
- : message_state_ (orb_core),
- mesg_base_ (base),
- message_status_ (TAO_GIOP_WAITING_FOR_HEADER),
- message_size_ (input_cdr_size),
- current_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)),
- supp_buffer_ (orb_core->data_block_for_message_block (input_cdr_size))
+ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$")
+TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (
+ TAO_ORB_Core * /*orb_core*/,
+ TAO_GIOP_Message_Base * /*base*/)
{
- // NOTE: The message blocks here use a locked allocator which is not
- // from the TSS even if there is one. We are getting the allocators
- // from the global memory. We shouldn't be using the TSS stuff for
- // the following reason
- // (a) The connection handlers are per-connection and not
- // per-thread.
- // (b) The order of cleaning is important if we use allocators from
- // TSS. The TSS goes away when the threads go away. But the
- // connection handlers go away only when the ORB decides to shut
- // it down.
- ACE_CDR::mb_align (&this->current_buffer_);
-
- // Calculate the effective message after alignment
- this->message_size_ -= this->rd_pos ();
-}
-
-
-
-int
-TAO_GIOP_Message_Reactive_Handler::read_messages (TAO_Transport *transport)
-{
- // Read the message from the transport. The size of the message read
- // is the maximum size of the buffer that we have less the amount of
- // data that has already been read in to the buffer.
- ssize_t n = transport->recv (this->current_buffer_.wr_ptr (),
- this->current_buffer_.space ());
-
- if (n == -1)
- {
- if (errno == EWOULDBLOCK)
- return 0;
-
- return -1;
- }
- // @@ What are the other error handling here??
- else if (n == 0)
- {
- return -1;
- }
-
- if (TAO_debug_level == 5)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_Reactive_Handler::read_messages"
- " received %d bytes\n",
- n));
-
- size_t len;
- for (size_t offset = 0; offset < size_t(n); offset += len)
- {
- len = n - offset;
- if (len > 512)
- len = 512;
- ACE_HEX_DUMP ((LM_DEBUG,
- this->current_buffer_.wr_ptr () + offset,
- len,
- "TAO (%P|%t) - read_messages "));
- }
- ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n));
- }
-
-
- // Now we have a succesful read. First adjust the write pointer
- this->current_buffer_.wr_ptr (n);
-
- // Success
- return 1;
}
-int
-TAO_GIOP_Message_Reactive_Handler::parse_message_header (void)
-{
- // Check what message are we waiting for and take suitable action
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER)
- {
- size_t len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- if (len > TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // Parse the GIOP header
- if (this->parse_message_header_i (buf) == -1)
-
- return -1;
-
- int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN,
- len);
-
- // Set the pointer read pointer position in the
- // <current_buffer_>
- size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN;
-
- if (retval)
- {
- // We had a fragment header, so the position should be
- // beyond that
- pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
- }
-
- this->current_buffer_.rd_ptr (pos);
- buf = this->current_buffer_.rd_ptr ();
-
- // The GIOP header has been parsed. Set the status to wait for
- // payload
- this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
-
- return 1;
- }
-
- // We dont have sufficient information to decipher the GIOP
- // header. Make sure that the reactor calls us back.
- return -2;
- }
-
- // The last read just "read" left-over messages
- return 0;
-}
int
-TAO_GIOP_Message_Reactive_Handler::is_message_ready (void)
+TAO_GIOP_Message_Reactive_Handler::parse_message_header (ACE_Message_Block &incoming,
+ TAO_GIOP_Message_State &state)
{
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
- {
- size_t len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- // Set the buf pointer to the start of the GIOP header
- buf -= TAO_GIOP_MESSAGE_HEADER_LEN;
-
- // Dump the incoming message . It will be dumped only if the
- // debug level is greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len + TAO_GIOP_MESSAGE_HEADER_LEN);
- if (len == this->message_state_.message_size)
- {
- // If the buffer length is equal to the size of the payload we
- // have exactly one message.
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
-
- // Check whether we have received only the first part of the
- // fragment.
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else if (len > this->message_state_.message_size)
- {
- // If the length is greater we have received some X messages
- // and a part of X + 1 messages (probably) with X varying
- // from 1 to N.
- this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES;
-
- // Clone the data that we read in.
- this->supp_buffer_.data_block (
- this->current_buffer_.data_block ()->clone ());
-
- // Set the read and write pointer for the supplementary
- // buffer.
- size_t rd_pos = this->rd_pos ();
- this->supp_buffer_.rd_ptr (rd_pos +
- this->message_state_.message_size);
- this->supp_buffer_.wr_ptr (this->wr_pos ());
-
- // Reset the current buffer
- this->current_buffer_.reset ();
-
- // Set the read and write pointers again for the current
- // buffer. We change the write pointer settings as we would
- // like to process a single message.
- this->current_buffer_.rd_ptr (rd_pos);
- this->current_buffer_.wr_ptr (rd_pos +
- this->message_state_.message_size);
-
- return this->message_state_.is_complete (this->current_buffer_);
- }
- }
- else if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES)
+ // @@Bala: Need to make a check whether we are waiting for the
+ // header...
+ if (incoming.length () > TAO_GIOP_MESSAGE_HEADER_LEN)
{
- size_t len = this->supp_buffer_.length ();
-
- if (len > TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // @@ What about fragment headers???
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- if (this->parse_message_header_i (buf) == -1)
-
- return -1;
-
- // Set the pointer read pointer position in the
- // <current_buffer_>
- this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- // The GIOP header has been parsed. Set the status to wait for
- // payload
- this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
-
- return this->get_message ();
- }
- else
- {
- // We have smaller than the header size left here. We
- // just copy the rest of the stuff and reset things so that
- // we can read the rest of the stuff from the socket.
- this->current_buffer_.copy (
- this->supp_buffer_.rd_ptr (),
- len);
-
- // Reset the supp buffer now
- this->supp_buffer_.reset ();
-
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
- }
-
+ // Parse the GIOP header
+ if (this->parse_message_header_i (incoming, state) == -1)
+ return -1;
}
- // Just send us back to the reactor so that we can for more data to
- // come in .
return 0;
}
-ACE_Data_Block *
-TAO_GIOP_Message_Reactive_Handler::steal_data_block (void)
-{
- ACE_Data_Block *db =
- this->current_buffer_.data_block ()->clone_nocopy ();
-
- ACE_Data_Block *old_db =
- this->current_buffer_.replace_data_block (db);
-
- ACE_CDR::mb_align (&this->current_buffer_);
-
- return old_db;
-}
-
-
-void
-TAO_GIOP_Message_Reactive_Handler::reset (int reset_flag)
-{
- // Reset the contents of the message state
- this->message_state_.reset (reset_flag);
-
- // Reset the current buffer
- this->current_buffer_.reset ();
-
- ACE_CDR::mb_align (&this->current_buffer_);
-
- if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES)
- {
- this->supp_buffer_.reset ();
- ACE_CDR::mb_align (&this->supp_buffer_);
- }
-
-}
-
-
int
-TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (char *buf)
+TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (ACE_Message_Block &incoming,
+ TAO_GIOP_Message_State &state)
{
if (TAO_debug_level > 8)
{
ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n"));
}
- // Check whether we have a GIOP Message in the first place
- if (this->parse_magic_bytes (buf) == -1)
- return -1;
+ // Grab the rd_ptr_ from the message block..
+ char *buf = incoming.rd_ptr ();
- // Let us be specific that this is for 1.0
- if (this->message_state_.giop_version.minor == 0 &&
- this->message_state_.giop_version.major == 1)
+ // Parse the magic bytes first
+ if (this->parse_magic_bytes (buf) == -1)
{
- this->message_state_.byte_order =
- buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
-
- if (this->message_state_.byte_order != 0 &&
- this->message_state_.byte_order != 1)
- {
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
- ACE_TEXT (" for version <1.0>\n"),
- this->message_state_.byte_order));
- return -1;
- }
+ return -1;
}
- else
- {
- // Read the byte ORDER
- this->message_state_.byte_order =
- (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
- // Read the fragment bit
- this->message_state_.more_fragments =
- (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
+ // Get the version information
+ if (this->get_version_info (buf, state) == -1)
+ return -1;
- if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
- {
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
- ACE_TEXT (" for version <%d %d> \n"),
- buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
- this->message_state_.giop_version.major,
- this->message_state_.giop_version.minor));
- return -1;
- }
- }
+ // Get the byte order information...
+ if (this->get_byte_order_info (buf, state) == -1)
+ return -1;
// Get the message type
- this->message_state_.message_type =
- buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
-
- // Get the payload size. If the payload size is greater than the
- // length then set the length of the message block to that
- // size. Move the rd_ptr to the end of the GIOP header
- this->message_state_.message_size = this->get_payload_size (buf);
+ state.message_type = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
- // If the message_size or the payload_size is zero then something
- // is fishy. So return an error.
- if (this->message_state_.message_size == 0)
- return -1;
+ // Get the payload size
+ this->get_payload_size (buf, state);
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"),
- this->message_state_.giop_version.major,
- this->message_state_.giop_version.minor,
- this->message_state_.byte_order,
- this->message_state_.message_type,
- this->message_state_.message_size));
- }
+ // Parse the
+ int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN,
+ len);
- return 1;
-}
+ // Set the pointer read pointer position in the
+ // <current_buffer_>
+ size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN;
+ if (retval)
+ {
+ // We had a fragment header, so the position should be
+ // beyond that
+ pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+ }
-int
-TAO_GIOP_Message_Reactive_Handler::parse_fragment_header (char *buf,
- size_t length)
-{
- size_t len =
- TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN;
-
- // By this point we are doubly sure that we have a more or less
- // valid GIOP message with a valid major revision number.
- if (this->message_state_.more_fragments &&
- this->message_state_.giop_version.minor == 2 &&
- length > len)
- {
- // Fragmented message in GIOP 1.2 should have a fragment header
- // following the GIOP header. Grab the rd_ptr to get that
- // info.
- this->message_state_.request_id = this->read_ulong (buf);
+ this->current_buffer_.rd_ptr (pos);
+ buf = this->current_buffer_.rd_ptr ();
- // As we parsed the header
- return 1;
+ // The GIOP header has been parsed. Set the status to wait for
+ // payload
+ this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
}
+ return 1;
- return 0;
-}
int
@@ -406,7 +108,7 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
&& buf [2] == 0x4f // 'O'
&& buf [3] == 0x50)) // 'P'
{
- // For the present...
+ // For the present...
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) bad header, "
@@ -418,12 +120,25 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
return -1;
}
+ return 0;
+}
+
+int
+TAO_GIOP_Message_Reactive_Handler::get_version_info (char *buf,
+ TAO_GIOP_Message_State &state)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n"));
+ }
+
// We have a GIOP message on hand. Get its revision numbers
CORBA::Octet incoming_major =
buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
CORBA::Octet incoming_minor =
buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+ // Check the revision information
if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
incoming_major,
incoming_minor) == 0)
@@ -439,55 +154,81 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
}
// Set the version
- this->message_state_.giop_version.minor = incoming_minor;
- this->message_state_.giop_version.major = incoming_major;
+ state.giop_version.minor = incoming_minor;
+ state.giop_version.major = incoming_major;
return 0;
}
+int
+TAO_GIOP_Message_Reactive_Handler::get_byte_order_info (char *buf,
+ TAO_GIOP_Message_State &message_state)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n"));
+ }
+
+ // Let us be specific that this is for 1.0
+ if (message_state.giop_version.minor == 0 &&
+ message_state.giop_version.major == 1)
+ {
+ message_state_.byte_order =
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
+
+ if (message_state.byte_order != 0 &&
+ message_state.byte_order != 1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
+ ACE_TEXT (" for version <1.0>\n"),
+ message_state.byte_order));
+ return -1;
+ }
+ }
+ else
+ {
+ // Read the byte ORDER
+ message_state.byte_order =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
+
+ // Read the fragment bit
+ message_state.more_fragments =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
+
+ if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
+ ACE_TEXT (" for version <%d %d> \n"),
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
+ message_state_.giop_version.major,
+ message_state_.giop_version.minor));
+ return -1;
+ }
+ }
+
+ return 0;
+}
CORBA::ULong
-TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr)
+TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr,
+ TAO_GIOP_Message_State &message_state)
{
// Move the read pointer
rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
- CORBA::ULong x = this->read_ulong (rd_ptr);
-
- if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_)
- {
- if (ACE_CDR::grow (&this->current_buffer_,
- x + TAO_GIOP_MESSAGE_HEADER_LEN) == -1)
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P | %t) Unable to increase the size \n")
- ACE_TEXT ("of the buffer \n")));
- return 0;
- }
-
- // New message size is the size of the now larger buffer.
- this->message_size_ = x +
- TAO_GIOP_MESSAGE_HEADER_LEN +
- ACE_CDR::MAX_ALIGNMENT;
- }
-
- // Set the read pointer to the end of the GIOP message
- // this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
- return x;
-}
+ CORBA::ULong x = 0;
-CORBA::ULong
-TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr)
-{
size_t msg_size = 4;
- char *buf = ACE_ptr_align_binary (ptr,
+ char *buf = ACE_ptr_align_binary (rd_ptr,
msg_size);
- CORBA::ULong x;
#if !defined (ACE_DISABLE_SWAP_ON_READ)
- if (!(this->message_state_.byte_order != ACE_CDR_BYTE_ORDER))
+ if (!(state.byte_order != ACE_CDR_BYTE_ORDER))
{
x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
}
@@ -499,76 +240,5 @@ TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr)
x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf);
#endif /* ACE_DISABLE_SWAP_ON_READ */
- return x;
-}
-
-
-
-
-int
-TAO_GIOP_Message_Reactive_Handler::get_message (void)
-{
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
- {
- size_t len = this->supp_buffer_.length ();
- char * buf =
- this->current_buffer_.rd_ptr ();
-
- buf -= TAO_GIOP_MESSAGE_HEADER_LEN;
-
- if (len == this->message_state_.message_size)
- {
- // If the buffer length is equal to the size of the payload we
- // have exactly one message. Check whether we have received
- // only the first part of the fragment.
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->message_state_.message_size);
-
- // The message will be dumped only if the debug level is
- // greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len +
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (this->message_state_.message_size);
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else if (len > this->message_state_.message_size)
- {
- // If the length is greater we have received some X messages
- // and a part of X + 1 messages (probably) with X varying
- // from 1 to N.
- this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES;
-
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->message_state_.message_size);
-
- // The message will be dumped only if the debug level is
- // greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len +
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (this->message_state_.message_size);
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else
- {
- // The remaining message in the supp buffer
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->supp_buffer_.length ());
-
- // Reset the supp buffer now
- this->supp_buffer_.reset ();
- }
- }
-
- return 0;
+ message_state.message_size = x;
}