summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-01-01 16:48:50 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-01-01 16:48:50 +0000
commit235429120b036b50d4b1067fdaf83f5c4ca6784f (patch)
treed6e40c6350b5bf734ca4d3c0bfaa3fc1d876cf02
parent128ae0d08cdbfed47b3b5a6077bb7ac91319ec1b (diff)
downloadATCD-235429120b036b50d4b1067fdaf83f5c4ca6784f.tar.gz
*** empty log message ***
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp93
-rw-r--r--TAO/tao/GIOP_Message_Base.h3
-rw-r--r--TAO/tao/GIOP_Message_Handler.cpp167
-rw-r--r--TAO/tao/GIOP_Message_Handler.h42
-rw-r--r--TAO/tao/GIOP_Message_Handler.inl36
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp4
-rw-r--r--TAO/tao/GIOP_Message_State.cpp85
-rw-r--r--TAO/tao/GIOP_Message_State.h15
-rw-r--r--TAO/tao/GIOP_Message_State.i1
-rw-r--r--TAO/tao/IIOP_Connection_Handler.cpp3
-rw-r--r--TAO/tao/IIOP_Transport.cpp16
-rw-r--r--TAO/tao/Pluggable_Messaging.h8
-rw-r--r--TAO/tao/TAO.dsp19
13 files changed, 341 insertions, 151 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 5232776bb4b..6358a2159b1 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -198,21 +198,28 @@ TAO_GIOP_Message_Base::read_message (TAO_Transport *transport,
this->set_state (state.giop_version.major,
state.giop_version.minor);
- if (TAO_debug_level >= 4)
+
+
+ int retval = this->message_handler_.is_message_ready ();
+
+ if (retval == 1)
{
- size_t len = TAO_GIOP_MESSAGE_HEADER_LEN ;
+ if (TAO_debug_level >= 4)
+ {
+ size_t len = TAO_GIOP_MESSAGE_HEADER_LEN ;
- char *buf = this->message_handler_.rd_ptr ();
- buf -= len;
- size_t msg_len =
- state.message_size + len;
+ char *buf = this->message_handler_.rd_ptr ();
+ buf -= len;
+ size_t msg_len =
+ state.message_size + len;
- this->dump_msg ("recv",
- ACE_reinterpret_cast (u_char *,
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
buf),
- msg_len);
+ msg_len);
+ }
}
- return this->message_handler_.is_message_ready ();
+ return retval;
}
@@ -332,8 +339,8 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
// generally required as we are not going to write anything. But
// this is *important* for checking the length of the CDR streams
size_t n = this->message_handler_.message_state ().message_size;
- msg_block.wr_ptr (n + TAO_GIOP_MESSAGE_HEADER_LEN);
- msg_block.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
+ msg_block.wr_ptr (this->message_handler_.wr_pos ());
+ msg_block.rd_ptr (this->message_handler_.rd_pos ());
// Steal the input CDR from the message block
TAO_InputCDR input_cdr (&msg_block,
@@ -1066,7 +1073,6 @@ TAO_GIOP_Message_Base::send_reply_exception (
return transport->send_message (output);
}
-
void
TAO_GIOP_Message_Base::dump_msg (const char *label,
const u_char *ptr,
@@ -1140,6 +1146,7 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
}
+
int
TAO_GIOP_Message_Base::generate_locate_reply_header (
TAO_OutputCDR & /*cdr*/,
@@ -1156,3 +1163,63 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (void)
// we know.
return this->generator_parser_->is_ready_for_bidirectional ();
}
+
+int
+TAO_GIOP_Message_Base::more_messages (void)
+{
+ // Does the handler have more messages for processing?
+ int retval = this->message_handler_.more_messages ();
+
+ if (retval == TAO_MESSAGE_BLOCK_COMPLETE ||
+ retval == TAO_MESSAGE_BLOCK_INCOMPLETE)
+ return 1;
+
+ // Get the message state
+ TAO_GIOP_Message_State &state =
+ this->message_handler_.message_state ();
+
+ // Set the state internally for parsing and generating messages
+ this->set_state (state.giop_version.major,
+ state.giop_version.minor);
+
+ if (TAO_debug_level >= 4)
+ {
+ size_t len = TAO_GIOP_MESSAGE_HEADER_LEN ;
+
+ char *buf = this->message_handler_.rd_ptr ();
+ buf -= len;
+ size_t msg_len =
+ state.message_size + len;
+
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ buf),
+ msg_len);
+ }
+
+ retval = this->message_handler_.is_message_ready ();
+
+ if (retval == 1)
+ {
+ if (TAO_debug_level >= 4)
+ {
+ size_t len = TAO_GIOP_MESSAGE_HEADER_LEN ;
+
+ char *buf = this->message_handler_.rd_ptr ();
+ buf -= len;
+ size_t msg_len =
+ state.message_size + len;
+
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ buf),
+ msg_len);
+ }
+
+ // We have a message ready. This can be processed by the higher
+ // layers
+ return TAO_MESSAGE_BLOCK_NEEDS_PROCESSING;
+ }
+
+ return retval;
+}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 81879a2fbbd..5cc80029d9e 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -176,6 +176,9 @@ private:
/// request/response?
virtual int is_ready_for_bidirectional (void);
+ /// Are there any more messages that needs processing
+ virtual int more_messages (void);
+
private:
/// Thr message handler object that does reading and parsing of the
diff --git a/TAO/tao/GIOP_Message_Handler.cpp b/TAO/tao/GIOP_Message_Handler.cpp
index 41f748f53fb..02c54980156 100644
--- a/TAO/tao/GIOP_Message_Handler.cpp
+++ b/TAO/tao/GIOP_Message_Handler.cpp
@@ -11,8 +11,6 @@
-
-
ACE_RCSID(tao, GIOP_Message_Handler, "$Id$")
@@ -25,6 +23,7 @@ TAO_GIOP_Message_Handler::TAO_GIOP_Message_Handler (TAO_ORB_Core * orb_core)
// data portion from this buffer in the skeleton. Why?? Needs
// investigation.
//current_buffer_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE)),
+ supp_buffer_ (ACE_CDR::DEFAULT_BUFSIZE),
message_state_ (orb_core)
{
}
@@ -60,7 +59,7 @@ TAO_GIOP_Message_Handler::read_parse_message (TAO_Transport *transport)
return -1;
}
- // Now we have a succesful read. First adjust the read pointer
+ // Now we have a succesful read. First adjust the write pointer
this->current_buffer_.wr_ptr (n);
// Check what message are we waiting for and take suitable action
@@ -69,27 +68,10 @@ TAO_GIOP_Message_Handler::read_parse_message (TAO_Transport *transport)
if (this->current_buffer_.length () >=
TAO_GIOP_MESSAGE_HEADER_LEN)
{
- if (this->parse_header () == -1)
- return -1;
+ return this->parse_header ();
}
}
- /*if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
- {
- // If the length of the buffer is greater than the size of the
- // message that we received then process that message. If not
- // just return allowing the reactor to call us back.
- if (this->current_buffer_.length () < (this->message_status_.message_size +
- TAO_GIOP_MESSAGE_HEADER_LEN))
- return 0;
- else
- {
- // We have payloads that we need to process
- if (this->parse_payload () == -1)
- return -1;
- }
- }*/
-
return 0;
}
@@ -147,11 +129,9 @@ TAO_GIOP_Message_Handler::parse_header (void)
this->message_state_.message_type =
buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
-
-
-
// Get the payload size. If the payload size is greater than the
- // length then set the length of the message block to that size
+ // length then set the length of the message block to that
+ // size. Move the rd_ptr to the end of the GIOP header
this->message_state_.message_size = this->get_payload_size ();
if (TAO_debug_level > 2)
@@ -165,10 +145,25 @@ TAO_GIOP_Message_Handler::parse_header (void)
this->message_state_.message_size));
}
- // The GIOP header has been parsed. Set the status to wait for payload
+ if (this->message_state_.more_fragments &&
+ this->message_state_.giop_version.minor == 2 &&
+ this->current_buffer_.length () > TAO_GIOP_MESSAGE_FRAGMENT_HEADER)
+ {
+ // Fragmented message in GIOP 1.2 should have a fragment header
+ // following the GIOP header. Grab the rd_ptr to get that
+ // info.
+ buf = this->current_buffer_.rd_ptr ();
+ this->message_state_.request_id = this->read_ulong (buf);
+
+ // Move the read pointer to the end of the fragment header
+ this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
+ }
+
+ // The GIOP header has been parsed. Set the status to wait for
+ // payload
this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
- return 0;
+ return 1;
}
@@ -230,10 +225,27 @@ TAO_GIOP_Message_Handler::get_payload_size (void)
// the payload
this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_SIZE_OFFSET);
- // No. of bytes occupied by the message size in the header.
+ CORBA::ULong x = this->read_ulong (this->current_buffer_.rd_ptr ());
+
+ if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_)
+ {
+ // Increase the size of the <current_buffer_>
+ this->current_buffer_.size (x + TAO_GIOP_MESSAGE_HEADER_LEN);
+ this->message_size_ = x + TAO_GIOP_MESSAGE_HEADER_LEN;
+ }
+
+ // Set the read pointer to the end of the GIOP header
+ this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN -
+ TAO_GIOP_MESSAGE_SIZE_OFFSET);
+ return x;
+}
+
+CORBA::ULong
+TAO_GIOP_Message_Handler::read_ulong (const char *ptr)
+{
size_t msg_size = 4;
- char *buf = ACE_ptr_align_binary (this->current_buffer_.rd_ptr (),
+ char *buf = ACE_ptr_align_binary (ptr,
msg_size);
CORBA::ULong x;
@@ -250,15 +262,6 @@ TAO_GIOP_Message_Handler::get_payload_size (void)
x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf);
#endif /* ACE_DISABLE_SWAP_ON_READ */
- if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_)
- {
- // Increase the size of the <current_buffer_>
- this->current_buffer_.size (x + TAO_GIOP_MESSAGE_HEADER_LEN);
- }
-
- // Set the read pointer to the end of the GIOP header
- this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN -
- TAO_GIOP_MESSAGE_SIZE_OFFSET);
return x;
}
@@ -267,33 +270,83 @@ TAO_GIOP_Message_Handler::is_message_ready (void)
{
if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
{
- // If the length of the buffer is greater than the size of the
- // message that we received then process that message. If not
- // just return allowing the reactor to call us back.
- if (this->current_buffer_.length () <
- (this->message_state_.message_size))
+ size_t len = this->current_buffer_.length ();
+ int retval = 0;
+ if (len == this->message_state_.message_size)
{
- return 0;
+ // If the buffer length is equal to the size of the payload we
+ // have exactly one message. Check whether we have received
+ // only the first part of the fragment.
+ this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
+ return this->message_state_.is_complete (this->current_buffer_);
+ }
+ else if (len > this->message_state_.message_size)
+ {
+ // If the length is greater we have received some X messages
+ // and a part of X + 1 messages (probably) with X varying
+ // from 1 to N.
+ this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES;
+
+ // Now copy the first message in to the <supp_buffer_>
+ this->supp_buffer_.size (this->message_state_.message_size);
+ this->supp_buffer_.copy (this->current_buffer_.rd_ptr (),
+ this->message_state_.message_size);
+
+ // We have one of the messages copied. Let us move the
+ // rd_ptr in <current_buffer_> after that message
+ this->current_buffer_.rd_ptr (this->message_state_.message_size);
+
+ return this->message_state_.is_complete (this->supp_buffer_);
}
}
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
- // We have atleast one message for processing
- return 1;
+
+ // Just return allowing the reactor to call us back to get the rest
+ // of the info
+ return 0;
}
-/*int
-TAO_GIOP_Message_Handler::parse_payload (void)
+
+
+int
+TAO_GIOP_Message_Handler::more_messages (void)
{
- if (this->current_buffer_.length () ==
- (this->message_state_.message_size + TAO_GIOP_MESSAGE_HEADER_LEN))
+ if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES)
{
- // We have exactly one message in the buffer
-
- // Reset our input CDR stream
- this->message_state_.cdr.reset_byte_order
- (this->message_state_.byte_order);
+ if (this->current_buffer_.length () >
+ TAO_GIOP_MESSAGE_HEADER_LEN)
+ return this->parse_header ();
+ else
+ {
+ // We have some message but it is not of suffcieint length
+ // for us to process. We copy that left over piece to the
+ // start of the <current_buffer_> and align the rd_ptr &
+ // wr_ptr.
+ this->align_left_info ();
+ this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
+ return TAO_MESSAGE_BLOCK_INCOMPLETE;
+ }
}
+
+ // No more meaningful messages
+ return TAO_MESSAGE_BLOCK_COMPLETE;
+}
+
+void
+TAO_GIOP_Message_Handler::align_left_info (void)
+{
+ // Copy left over stuff in to <supp_buffer_>
+ this->supp_buffer_.copy (this->current_buffer_.rd_ptr (),
+ this->current_buffer_.length ());
+
+ // Reset the current buffer
+ this->current_buffer_.reset ();
+
+ // Copy the info from the <supp_buffer_>
+ this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
+ this->supp_buffer_.length ());
+
+ // Reset the <supp_buffer_>
+ this->supp_buffer_.reset ();
}
-*/
diff --git a/TAO/tao/GIOP_Message_Handler.h b/TAO/tao/GIOP_Message_Handler.h
index 728cc53cff8..dd573490da4 100644
--- a/TAO/tao/GIOP_Message_Handler.h
+++ b/TAO/tao/GIOP_Message_Handler.h
@@ -30,7 +30,24 @@ enum TAO_GIOP_Message_Status
TAO_GIOP_WAITING_FOR_HEADER = 0,
/// The buffer is waiting for the payload to appear on the socket
- TAO_GIOP_WAITING_FOR_PAYLOAD
+ TAO_GIOP_WAITING_FOR_PAYLOAD,
+
+ /// The buffer has got multiple messages
+ TAO_GIOP_MULTIPLE_MESSAGES
+};
+
+enum TAO_Message_Block_Content_Status
+{
+ /// The buffer has nomore info for processing ie. all information
+ /// have been processed
+ TAO_MESSAGE_BLOCK_COMPLETE = 3,
+
+ /// The buffer has something meaningful and needs processing
+ TAO_MESSAGE_BLOCK_NEEDS_PROCESSING,
+
+ /// The buffer has nothing meaningful. Need to read more data from
+ /// the socket to make the reamaining data meaningful
+ TAO_MESSAGE_BLOCK_INCOMPLETE
};
/**
@@ -61,6 +78,9 @@ public:
/// processing.
int is_message_ready (void);
+ /// Do we have more messages for processing?
+ int more_messages (void);
+
/// Reset the contents of the <current_buffer_> if no more requests
/// need to be processed. We reset the contents of the
/// <message_state_> to parse and process the next request.
@@ -78,6 +98,12 @@ public:
/// Return the rd_ptr of the <current_buffer_>
char *rd_ptr (void) const;
+ /// Return the position of the read pointer in the <current_buffer_>
+ size_t rd_pos (void) const;
+
+ /// Return the position of the write pointer in the <current_buffer_>
+ size_t wr_pos (void) const;
+
private:
/// Parses the header information from the <current_buffer_>.
@@ -93,6 +119,13 @@ private:
/// size of the buffer is increased.
CORBA::ULong get_payload_size (void);
+ /// Extract a CORBA::ULong from the <current_buffer_>
+ CORBA::ULong read_ulong (const char *buf);
+
+ /// Align the left over info in the <current_buffer_> to the start
+ /// of the message block.
+ void align_left_info (void);
+
private:
/// The state of the message in the buffer
@@ -110,6 +143,12 @@ private:
/// read() should put the data.
ACE_Message_Block current_buffer_;
+ /// The supplementary buffer that holds just one message if the
+ /// <current_buffer_> has more than one message. One message from
+ /// the <current_buffer_> is taken and filled in this buffer, which
+ /// is then sent to the higher layers of the ORB.
+ ACE_Message_Block supp_buffer_;
+
/// The message state. It represents the status of the messages that
/// have been read from the current_buffer_
TAO_GIOP_Message_State message_state_;
@@ -122,6 +161,7 @@ const size_t TAO_GIOP_MESSAGE_FLAGS_OFFSET = 6;
const size_t TAO_GIOP_MESSAGE_TYPE_OFFSET = 7;
const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5;
const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4;
+const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4;
#if defined (__ACE_INLINE__)
# include "tao/GIOP_Message_Handler.inl"
diff --git a/TAO/tao/GIOP_Message_Handler.inl b/TAO/tao/GIOP_Message_Handler.inl
index 2a69fc1186c..37d15d62e2d 100644
--- a/TAO/tao/GIOP_Message_Handler.inl
+++ b/TAO/tao/GIOP_Message_Handler.inl
@@ -27,7 +27,11 @@ TAO_GIOP_Message_Handler::reset (int /*reset_flag*/)
this->message_state_.reset (0);
// Reset the current buffer
- this->current_buffer_.reset ();
+ if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES)
+ this->current_buffer_.reset ();
+
+ this->supp_buffer_.reset ();
+
}
ACE_INLINE char *
@@ -35,3 +39,33 @@ TAO_GIOP_Message_Handler::rd_ptr (void) const
{
return this->current_buffer_.rd_ptr ();
}
+
+ACE_INLINE size_t
+TAO_GIOP_Message_Handler::rd_pos (void) const
+{
+ if (this->supp_buffer_.length () > 0)
+ {
+ return
+ this->supp_buffer_.rd_ptr () - this->supp_buffer_.base ();
+ }
+ else
+ {
+ return
+ this->current_buffer_.rd_ptr () - this->current_buffer_.base ();
+ }
+}
+
+ACE_INLINE size_t
+TAO_GIOP_Message_Handler::wr_pos (void) const
+{
+ if (this->supp_buffer_.length () > 0)
+ {
+ return
+ this->supp_buffer_.wr_ptr () - this->supp_buffer_.base ();
+ }
+ else
+ {
+ return
+ this->current_buffer_.wr_ptr () - this->current_buffer_.base ();
+ }
+}
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index f6064bd4c0b..cc3fad61e14 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -280,7 +280,7 @@ TAO_GIOP_Message_Lite::read_message (TAO_Transport *transport,
return -1;
}
- this->message_state_.current_offset += n;
+ current_offset += n;
if (this->message_state_.current_offset ==
this->message_state_.message_size)
@@ -299,7 +299,7 @@ TAO_GIOP_Message_Lite::read_message (TAO_Transport *transport,
}
}
- return this->message_state_.is_complete ();
+ return 1;//this->message_state_.is_complete ();
}
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index fa5abe73c41..cc6b52f520d 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -17,12 +17,12 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State (TAO_ORB_Core* orb_core)
more_fragments (0),
message_type (TAO_GIOP_MESSAGERROR),
message_size (0),
- current_offset (0),
+ request_id (0),
cdr (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE),
TAO_ENCAP_BYTE_ORDER,
orb_core),
- fragments_begin (0),
- fragments_end (0)
+ // Problem similar to GIOP_Message_handler.cpp - Bala
+ fragmented_messages (ACE_CDR::DEFAULT_BUFSIZE)
{
//giop_version.major = TAO_DEF_GIOP_MAJOR;
//giop_version.minor = TAO_DEF_GIOP_MINOR;
@@ -35,83 +35,84 @@ TAO_GIOP_Message_State::~TAO_GIOP_Message_State (void)
}
int
-TAO_GIOP_Message_State::is_complete ()
+TAO_GIOP_Message_State::is_complete (ACE_Message_Block &current_buf)
{
- if (this->message_size != this->current_offset)
- return 0;
-
if (this->more_fragments)
{
- // This is only one fragment of the complete Request....
- ACE_Message_Block* current =
- this->cdr.steal_contents ();
- if (this->fragments_begin == 0)
+ if (this->fragmented_messages.length () == 0)
{
this->first_fragment_byte_order = this->byte_order;
this->first_fragment_giop_version = this->giop_version;
this->first_fragment_message_type = this->message_type;
- this->fragments_end = this->fragments_begin = current;
+ // this->fragments_end = this->fragments_begin = current;
+ this->fragmented_messages.copy (current_buf.rd_ptr (),
+ current_buf.length ());
+
+ // Reset the buffer
+ current_buf.reset ();
+
+ // Reset our state
this->reset ();
return 0;
}
- return this->append_fragment (current);
+ return this->append_fragment (current_buf);
}
- if (this->fragments_begin != 0)
+ if (this->fragmented_messages.length () != 0)
{
// This is the last message, but we must defragment before
// sending
-
- ACE_Message_Block* current =
- this->cdr.steal_contents ();
- if (this->append_fragment (current) == -1)
+ if (this->append_fragment (current_buf) == -1)
return -1;
- // Copy the entire chain into the input CDR.....
- this->cdr.reset (this->fragments_begin,
- this->first_fragment_byte_order);
- ACE_Message_Block::release (this->fragments_begin);
- this->fragments_begin = 0;
- this->fragments_end = 0;
+ // Copy the entire message block into <current_buf>
+ current_buf.data_block (this->fragmented_messages.data_block ()->clone ());
+
+ this->fragmented_messages.reset ();
this->byte_order = this->first_fragment_byte_order;
this->giop_version = this->first_fragment_giop_version;
this->message_type = this->first_fragment_message_type;
- /*FALLTHROUGH*/
+ // This message has no more fragments, and there where no fragments
+ // before it, just return. Notice that current_buf has the
+ // *right* contents
}
- // else
- // {
- // This message has no more fragments, and there where no fragments
- // before it, just return... notice that this->cdr has the right
- // contents.
- // }
+
return 1;
}
int
-TAO_GIOP_Message_State::append_fragment (ACE_Message_Block* current)
+TAO_GIOP_Message_State::append_fragment (ACE_Message_Block& current)
{
- this->fragments_end->cont (current);
- this->fragments_end = this->fragments_end->cont ();
-
if (this->first_fragment_byte_order != this->byte_order
|| this->first_fragment_giop_version.major != this->giop_version.major
|| this->first_fragment_giop_version.minor != this->giop_version.minor)
{
- // Yes, print it out in all debug levels!
- // @@ Bala: i know this code is mine, but could you check out
- // the spec and the latest CORBA 2.4 draft (ptc/00-03-02) to
- // verify if this is actually an error or not? If so, could you
- // please site the right section of the spec?
+ // Yes, print it out in all debug levels!. This is an error by
+ // CORBA 2.4 spec
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) incompatible fragments:\n"
- " Different GIOP versions or byte order\n"));
+ ACE_TEXT ("TAO (%P|%t) incompatible fragments:\n")
+ ACE_TEXT (" Different GIOP versions or byte order\n")));
this->reset ();
return -1;
}
+
+ size_t req_size =
+ this->fragmented_messages.size () + current.length ();
+
+ this->fragmented_messages.size (req_size);
+
+ // Copy the message
+ this->fragmented_messages.copy (current.rd_ptr (),
+ current.length ());
+
+ current.reset ();
+
+ // Reset our state
this->reset ();
+
return 0;
}
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index c4c492c56c0..18e1abf2816 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -95,7 +95,7 @@ public:
/// Check if the current message is complete, adjusting the fragments
/// if required...
- int is_complete (void);
+ int is_complete (ACE_Message_Block &current_buf);
/// Version info
TAO_GIOP_Version giop_version;
@@ -112,8 +112,8 @@ public:
/// in byte_order!
CORBA::ULong message_size;
- /// How much of the payload has been received
- CORBA::ULong current_offset;
+ /// Request Id from the Fragment header
+ CORBA::ULong request_id;
/// This is the InputCDR that will be used to decode the message.
TAO_InputCDR cdr;
@@ -121,10 +121,11 @@ public:
/**
* The fragments are collected in a chain of message blocks (using
* the cont() field). When the complete message is received the
- * chain is reassembled into <cdr>
+ * chain is reassembled into the main message block that is sent
+ * along
*/
- ACE_Message_Block* fragments_begin;
- ACE_Message_Block* fragments_end;
+ ACE_Message_Block fragmented_messages;
+
/**
* The byte order for the the first fragment
@@ -155,7 +156,7 @@ public:
private:
/// Append <current> to the list of fragments
/// Also resets the state, because the current message was consumed.
- int append_fragment (ACE_Message_Block* current);
+ int append_fragment (ACE_Message_Block &current);
};
diff --git a/TAO/tao/GIOP_Message_State.i b/TAO/tao/GIOP_Message_State.i
index 90fc3299dea..38d3ce25ae5 100644
--- a/TAO/tao/GIOP_Message_State.i
+++ b/TAO/tao/GIOP_Message_State.i
@@ -71,7 +71,6 @@ ACE_INLINE void
TAO_GIOP_Message_State::reset (int reset_contents)
{
this->message_size = 0;
- this->current_offset = 0;
this->more_fragments = 0;
if (reset_contents)
this->cdr.reset_contents ();
diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp
index 4c32b6ffe0c..742aecc2361 100644
--- a/TAO/tao/IIOP_Connection_Handler.cpp
+++ b/TAO/tao/IIOP_Connection_Handler.cpp
@@ -93,6 +93,9 @@ TAO_IIOP_Connection_Handler::open (void*)
return -1;
#endif /* ! ACE_LACKS_TCP_NODELAY */
+ if (this->peer ().enable (ACE_NONBLOCK) == -1)
+ return -1;
+
// Called by the <Strategy_Acceptor> when the handler is
// completely connected.
ACE_INET_Addr addr;
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 21507a6f672..ac6770dea28 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -37,13 +37,13 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
messaging_object_ (0),
bidirectional_flag_ (-1)
{
- if (flag)
+ /* if (flag)
{
// Use the lite version of the protocol
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Lite (orb_core));
}
- else
+ else*/
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
@@ -182,8 +182,13 @@ TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
// Now we know that we have been able to read the complete message
// here..
- return this->process_message ();
+ result = 2; // Dummy
+ while (result > 1)
+ {
+ result = this->process_message ();
+ }
+ return result;
}
@@ -482,8 +487,9 @@ TAO_IIOP_Transport::process_message (void)
return -1;
}
- this->messaging_object_->reset ();
- return 1;
+ // this->messaging_object_->reset ();
+
+ return this->messaging_object_->more_messages ();
}
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 4d475e51233..1d1765f85a4 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -135,11 +135,13 @@ public:
TAO_OutputCDR &cdr,
TAO_Pluggable_Reply_Params &params,
CORBA::Exception &x) = 0;
- // Generate a reply message with the exception <ex>.
+ /// Is the messaging object ready for processing BiDirectional
+ /// request/response?
virtual int is_ready_for_bidirectional (void) = 0;
- // Is the messaging object ready for processing BiDirectional
- // request/response?
+
+ /// Are there any more messages that needs processing?
+ virtual int more_messages (void) = 0;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp
index 9450a5b9cb2..b91b5b62ead 100644
--- a/TAO/tao/TAO.dsp
+++ b/TAO/tao/TAO.dsp
@@ -1335,25 +1335,6 @@ SOURCE=.\GIOP_Message_Handler.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.cpp
-
-!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
-
-!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
-
-!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
-
-!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
-
-!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
-
-!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
-
-!ENDIF
-
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"