summaryrefslogtreecommitdiff
path: root/TAO/tao
diff options
context:
space:
mode:
authorbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-07-23 16:47:59 +0000
committerbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-07-23 16:47:59 +0000
commitf03470a0da6af5715b281aca5826be848c7a7c2f (patch)
treea676e62438b1f2d1d4dd00c7492d51e1fe7dce94 /TAO/tao
parent05ad64bb238e59fddd588743e0412d374e2de787 (diff)
downloadATCD-f03470a0da6af5715b281aca5826be848c7a7c2f.tar.gz
ChangeLogTag: Mon Jul 23 11:44:30 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'TAO/tao')
-rw-r--r--TAO/tao/Connection_Handler.h2
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp52
-rw-r--r--TAO/tao/GIOP_Message_Base.h6
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_12.cpp15
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp625
-rw-r--r--TAO/tao/GIOP_Message_Lite.h104
-rw-r--r--TAO/tao/IIOP_Transport.cpp6
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp14
-rw-r--r--TAO/tao/Incoming_Message_Queue.h18
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl38
-rw-r--r--TAO/tao/Makefile1
-rw-r--r--TAO/tao/Pluggable_Messaging.h6
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp22
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp5
-rw-r--r--TAO/tao/Strategies/UIOP_Transport.cpp8
-rw-r--r--TAO/tao/TAO.dsp12
-rw-r--r--TAO/tao/Transport.cpp82
-rw-r--r--TAO/tao/Transport.h6
-rw-r--r--TAO/tao/Transport.inl3
-rw-r--r--TAO/tao/orbconf.h13
20 files changed, 709 insertions, 329 deletions
diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h
index 9699d5e5414..ae74b4f8ad1 100644
--- a/TAO/tao/Connection_Handler.h
+++ b/TAO/tao/Connection_Handler.h
@@ -6,7 +6,7 @@
*
* $Id$
*
- * @author Bala Natarajan <bala@cs.wustl.edu>
+ * @author Balachandran Natarajan <bala@cs.wustl.edu>
*/
//=============================================================================
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index c9c4fe10904..3df80faebbc 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "GIOP_Message_Base.h"
#include "operation_details.h"
#include "GIOP_Utils.h"
@@ -18,6 +19,7 @@
ACE_RCSID (tao, GIOP_Message_Base, "$Id$")
+
TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
size_t /*input_cdr_size*/)
: orb_core_ (orb_core),
@@ -46,7 +48,7 @@ TAO_GIOP_Message_Base::init (CORBA::Octet major,
void
-TAO_GIOP_Message_Base::reset (int /* reset_flag */)
+TAO_GIOP_Message_Base::reset (void)
{
// no-op
}
@@ -254,10 +256,14 @@ TAO_GIOP_Message_Base::message_type (
case TAO_GIOP_CLOSECONNECTION:
return TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION;
+ case TAO_GIOP_FRAGMENT:
+ return TAO_PLUGGABLE_MESSAGE_FRAGMENT;
+
case TAO_GIOP_CANCELREQUEST:
case TAO_GIOP_MESSAGERROR:
- case TAO_GIOP_FRAGMENT:
// Never happens: why??
+
+
default:
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ")
@@ -386,8 +392,7 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
// Copy the pay load..
// Calculate the bytes that needs to be copied in the queue...
- size_t copy_len =
- state.message_size () - TAO_GIOP_MESSAGE_HEADER_LEN;
+ size_t copy_len = state.payload_size ();
// If teh data that needs to be copied is more than that is
// available to us ..
@@ -449,6 +454,42 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
}
+int
+TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd)
+{
+ if (dqd->byte_order_ != sqd->byte_order_
+ || dqd->major_version_ != sqd->major_version_
+ || dqd->minor_version_ != sqd->minor_version_)
+ {
+ // Yes, print it out in all debug levels!. This is an error by
+ // CORBA 2.4 spec
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) incompatible fragments:")
+ ACE_TEXT ("different GIOP versions or byte order\n")));
+ return -1;
+ }
+
+ // Skip the header in the incoming message
+ sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ // If we have a fragment header skip the header length too..
+ if (sqd->minor_version_ == 2)
+ sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
+
+ // Get the length of the incoming message block..
+ int incoming_size = sqd->msg_block_->length ();
+
+ // Increase the size of the destination message block
+ dqd->msg_block_->size (incoming_size);
+
+ // Copy the data
+ dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (),
+ incoming_size);
+
+ return 0;
+}
+
void
TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
{
@@ -460,6 +501,9 @@ TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
qd->minor_version_ =
this->message_state_.giop_version_.minor;
+ qd->more_fragments_ =
+ this->message_state_.more_fragments_;
+
qd->msg_type_=
this->message_type (this->message_state_);
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 028deebdb5f..6d5337be3e9 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -58,7 +58,7 @@ public:
CORBA::Octet minor);
/// Reset the messaging the object
- virtual void reset (int reset_flag = 1);
+ virtual void reset (void);
/// Write the RequestHeader in to the <cdr> stream. The underlying
/// implementation of the mesaging should do the right thing.
@@ -114,6 +114,10 @@ public:
/// Get the details of the message parsed through the <qd>.
virtual void get_message_data (TAO_Queued_Data *qd);
+ /// @@Bala:Docu??
+ virtual int consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd);
+
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
index 4f6f5b054cb..c42fa68ae90 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
+++ b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
@@ -206,6 +206,11 @@ TAO_GIOP_Message_Generator_Parser_12::write_locate_reply_mesg (
// Make the header for the locate request
output.write_ulong (status_info.status);
+ // Note: We dont align the pointer to an 8 byte boundary for a
+ // locate reply body. This is due to an urgent issue raised by Michi
+ // in the OMG. I discussed this with Michi today (09/07/2001) and I
+ // learn that this has been passed. Hence the change..
+ /*
if (status_info.status == TAO_GIOP_OBJECT_FORWARD ||
status_info.status == TAO_GIOP_OBJECT_FORWARD_PERM)
{
@@ -215,7 +220,7 @@ TAO_GIOP_Message_Generator_Parser_12::write_locate_reply_mesg (
return 0;
}
}
-
+ */
switch (status_info.status)
{
case TAO_GIOP_OBJECT_FORWARD:
@@ -467,11 +472,15 @@ TAO_GIOP_Message_Generator_Parser_12::parse_locate_reply (
return -1;
- if (cdr.length () > 0)
+ // Note: We dont align the pointer to an 8 byte boundary for a
+ // locate reply body. This is due to an urgent issue raised by Michi
+ // in the OMG. I discussed this with Michi today (09/07/2001) and I
+ // learn that this has been passed. Hence the change..
+ /*if (cdr.length () > 0)
{
// Align the read pointer on an 8-byte boundary
cdr.align_read_ptr (TAO_GIOP_MESSAGE_ALIGN_PTR);
- }
+ }*/
// Steal the contents in to the reply CDR and loose ownership of the
// data block.
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index d89da3c9653..c9b42560240 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -23,60 +23,17 @@ static const size_t TAO_GIOP_LITE_MESSAGE_SIZE_OFFSET = 0;
static const size_t TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET = 4;
TAO_GIOP_Message_Lite::TAO_GIOP_Message_Lite (TAO_ORB_Core *orb_core,
- size_t input_cdr_size)
- :message_state_ (orb_core),
- output_ (0),
- cdr_buffer_alloc_ (
- orb_core->resource_factory ()->output_cdr_buffer_allocator ()
- ),
- cdr_dblock_alloc_ (
- orb_core->resource_factory ()->output_cdr_dblock_allocator ()
- ),
- cdr_msgblock_alloc_ (
- orb_core->resource_factory ()->output_cdr_msgblock_allocator ()
- ),
- input_cdr_ (orb_core->create_input_cdr_data_block (input_cdr_size),
- 0,
- TAO_ENCAP_BYTE_ORDER,
- TAO_DEF_GIOP_MAJOR,
- TAO_DEF_GIOP_MINOR,
- orb_core),
- current_offset_ (0)
+ size_t /*input_cdr_size*/)
+ : orb_core_ (orb_core),
+ message_type_ (0),
+ message_size_ (0),
+ byte_order_ (ACE_CDR_BYTE_ORDER)
{
-#if defined (ACE_HAS_PURIFY)
- (void) ACE_OS::memset (this->repbuf_,
- '\0',
- sizeof this->repbuf_);
-#endif /* ACE_HAS_PURIFY */
- ACE_NEW (this->output_,
- TAO_OutputCDR (this->repbuf_,
- sizeof this->repbuf_,
- TAO_ENCAP_BYTE_ORDER,
- this->cdr_buffer_alloc_,
- this->cdr_dblock_alloc_,
- this->cdr_msgblock_alloc_,
- orb_core->orb_params ()->cdr_memcpy_tradeoff (),
- TAO_DEF_GIOP_MAJOR,
- TAO_DEF_GIOP_MINOR,
- orb_core->to_iso8859 (),
- orb_core->to_unicode ()));
}
TAO_GIOP_Message_Lite::~TAO_GIOP_Message_Lite (void)
{
- // Explicitly call the destructor of the output CDR first. They need
- // the allocators during destruction.
- delete this->output_;
-
- // Then call the destructor of our allocators
- if (this->cdr_dblock_alloc_ != 0)
- this->cdr_dblock_alloc_->remove ();
- // delete this->cdr_dblock_alloc_;
-
- if (this->cdr_buffer_alloc_ != 0)
- this->cdr_buffer_alloc_->remove ();
- // delete this->cdr_buffer_alloc_;
}
@@ -87,40 +44,11 @@ TAO_GIOP_Message_Lite::init (CORBA::Octet,
return;
}
-int
-TAO_GIOP_Message_Lite::parse_header (void)
-{
- // Get the read pointer
- char *buf = this->input_cdr_.rd_ptr ();
-
- // @@ Bala: i added the following comment, does it make sense?
- // In GIOPLite the version, byte order info, etc. are hardcoded, and
- // not transmitted over the wire.
- this->message_state_.byte_order = TAO_ENCAP_BYTE_ORDER;
- this->message_state_.giop_version.major = TAO_DEF_GIOP_MAJOR;
- this->message_state_.giop_version.minor = TAO_DEF_GIOP_MINOR;
-
- // Get the message type.
- this->message_state_.message_type = buf[TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET];
-
- this->input_cdr_.reset_byte_order (this->message_state_.byte_order);
-
- // The first bytes are the length of the message.
- this->input_cdr_.read_ulong (this->message_state_.message_size);
-
- return 0;
-}
-
-
void
-TAO_GIOP_Message_Lite::reset (int reset_flag)
+TAO_GIOP_Message_Lite::reset (void)
{
- // Reset the message state
- this->message_state_.reset (reset_flag);
-
- if (reset_flag)
- this->input_cdr_.reset_contents ();
- //What else???
+ this->message_type_ = 0;
+ this->message_size_ = 0;
}
@@ -222,105 +150,10 @@ TAO_GIOP_Message_Lite::generate_reply_header (
int
-TAO_GIOP_Message_Lite::read_message (TAO_Transport *transport,
+TAO_GIOP_Message_Lite::read_message (TAO_Transport * /*transport*/,
int /*block */,
- ACE_Time_Value *max_wait_time)
+ ACE_Time_Value * /*max_wait_time*/)
{
- if (this->message_state_.header_received () == 0)
- {
- int retval =
- TAO_GIOP_Utils::read_bytes_input (transport,
- this->input_cdr_,
- TAO_GIOP_LITE_HEADER_LEN ,
- max_wait_time);
- if (retval == -1)
- {
- if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - \n")
- ACE_TEXT ("TAO_GIOP_Message_Lite::read_message \n")));
- }
-
- return -1;
- }
-
- // Read the rest of the stuff. That should be read by the
- // corresponding states
- if (this->parse_header () == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t|%N%l) -\n"),
- ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input \n")));
- return -1;
- }
-
- if (this->input_cdr_.grow (TAO_GIOP_LITE_HEADER_LEN +
- this->message_state_.message_size) == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t|%N|%l) - %p\n"),
- ACE_TEXT ("ACE_CDR::grow")));
- return -1;
- }
-
- // Growing the buffer may have reset the rd_ptr(), but we want
- // to leave it just after the GIOP header (that was parsed
- // already);
- this->input_cdr_.skip_bytes (TAO_GIOP_LITE_HEADER_LEN);
- }
-
- size_t missing_data =
- this->message_state_.message_size - this->current_offset_;
-
- ssize_t n =
- TAO_GIOP_Utils::read_buffer (transport,
- this->input_cdr_.rd_ptr ()
- + this->current_offset_,
- missing_data,
- max_wait_time);
-
- if (n == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, read_buffer[1] \n")));
- return -1;
- }
- else if (n == 0)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("TAO_GIOP_Message_Lite::handle_input, read_buffer[2]\n")));
- return -1;
- }
-
- this->current_offset_ += n;
-
- if (this->current_offset_ ==
- this->message_state_.message_size)
- {
- if (TAO_debug_level >= 4)
- {
- size_t header_len = TAO_GIOP_LITE_HEADER_LEN ;
-
- char *buf = this->input_cdr_.rd_ptr ();
- buf -= header_len;
- size_t msg_len = this->input_cdr_.length () + header_len;
- this->dump_msg ("recv",
- ACE_reinterpret_cast (u_char *,
- buf),
- msg_len);
- }
- }
-
- if (this->current_offset_ != this->message_state_.message_size)
- return 0;
-
return 1;
}
@@ -394,16 +227,46 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream)
}
+int
+TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block)
+{
+ // Get the read pointer
+ char *buf = block.rd_ptr ();
+
+ CORBA::ULong x = 0;
+#if !defined (ACE_DISABLE_SWAP_ON_READ)
+ if (!(this->byte_order_ != ACE_CDR_BYTE_ORDER))
+ {
+ x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
+ }
+ else
+ {
+ ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x));
+ }
+#else
+ x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
+#endif /* ACE_DISABLE_SWAP_ON_READ */
+
+ this->message_size_ = x;
+
+ // Get the message type.
+ this->message_type_ = buf[TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET];
+
+ return 0;
+}
+
TAO_Pluggable_Message_Type
TAO_GIOP_Message_Lite::message_type (void)
{
- switch (this->message_state_.message_type)
+ switch (this->message_type_)
{
case TAO_GIOP_REQUEST:
- case TAO_GIOP_LOCATEREQUEST:
return TAO_PLUGGABLE_MESSAGE_REQUEST;
+ case TAO_GIOP_LOCATEREQUEST:
+ return TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST;
case TAO_GIOP_LOCATEREPLY:
+ return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY;
case TAO_GIOP_REPLY:
return TAO_PLUGGABLE_MESSAGE_REPLY;
@@ -425,54 +288,265 @@ TAO_GIOP_Message_Lite::message_type (void)
}
+ssize_t
+TAO_GIOP_Message_Lite::missing_data (ACE_Message_Block &block)
+{
+ // Actual message size including the header..
+ CORBA::ULong msg_size =
+ this->message_size_ + TAO_GIOP_LITE_HEADER_LEN;
+
+ size_t len = block.length ();
+
+ if (len > msg_size)
+ {
+ return -1;
+ }
+ else if (len == msg_size)
+ return 0;
+
+ return msg_size - len;
+}
+
+
int
-TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core)
+TAO_GIOP_Message_Lite::extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd)
{
- // Set the upcall thread
- orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ());
+ if (incoming.length () < TAO_GIOP_LITE_HEADER_LEN)
+ {
+ if (incoming.length () > 0)
+ {
+ // Make a node which has a message block of the size of
+ // MESSAGE_HEADER_LEN.
+ qd =
+ this->make_queued_data (TAO_GIOP_LITE_HEADER_LEN);
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ incoming.length ());
+ qd->missing_data_ = -1;
+ }
+ return 0;
+ }
- // Reset the output CDR stream.
- // @@@@Is it necessary here?
- this->output_->reset ();
+ if (this->parse_incoming_messages (incoming) == -1)
+ {
+ return -1;
+ }
- //
- // Take out all the information from the <message_state> and reset
- // it so that nested upcall on the same transport can be handled.
- //
+ size_t copying_len =
+ this->message_size_ + TAO_GIOP_LITE_HEADER_LEN;
- // Notice that the message_state is only modified in one thread at a
- // time because the reactor does not call handle_input() for the
- // same Event_Handler in two threads at the same time.
+ qd = this->make_queued_data (copying_len);
- // Steal the input CDR from the message state.
- TAO_InputCDR input_cdr (ACE_InputCDR::Transfer_Contents (this->input_cdr_),
- orb_core);
+ if (copying_len > incoming.length ())
+ {
+ qd->missing_data_ =
+ copying_len - incoming.length ();
- // Send the message state for the service layer like FT to log the
- // messages
- // @@@ Needed for DOORS
- // orb_core->services_log_msg_rcv (this->message_state_);
+ copying_len = incoming.length ();
+ }
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copying_len);
+
+ incoming.rd_ptr (copying_len);
+ qd->byte_order_ = TAO_ENCAP_BYTE_ORDER;
+ qd->major_version_ = TAO_DEF_GIOP_MAJOR;
+ qd->minor_version_ = TAO_DEF_GIOP_MINOR;
+ qd->msg_type_ = this->message_type ();
+ return 1;
+}
+
+int
+TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming)
+{
+ // Look to see whether we had atleast parsed the GIOP header ...
+ if (qd->missing_data_ == -1)
+ {
+ // The data length that has been stuck in there during the last
+ // read ....
+ size_t len =
+ qd->msg_block_->length ();
+
+ // We know that we would have space for
+ // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
+ // from the <incoming> into the message block in <qd>
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ TAO_GIOP_LITE_HEADER_LEN - len);
+
+ // Move the rd_ptr () in the incoming message block..
+ incoming.rd_ptr (TAO_GIOP_LITE_HEADER_LEN - len);
+
+ // Parse the message header now...
+ if (this->parse_incoming_messages (*qd->msg_block_) == -1)
+ return -1;
+
+ // Now grow the message block so that we can copy the rest of
+ // the data...
+ ACE_CDR::grow (qd->msg_block_,
+ this->message_size_ + TAO_GIOP_LITE_HEADER_LEN);
+
+ // Copy the pay load..
+
+ // Calculate the bytes that needs to be copied in the queue...
+ size_t copy_len = this->message_size_;
+
+ // If teh data that needs to be copied is more than that is
+ // available to us ..
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+ else
+ {
+ qd->missing_data_ = 0;
+ }
+
+ // ..now we are set to copy the right amount of data to the
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ incoming.rd_ptr (copy_len);
+
+ // Get the other details...
+ qd->byte_order_ = TAO_ENCAP_BYTE_ORDER;
+ qd->major_version_ = TAO_DEF_GIOP_MAJOR;
+ qd->minor_version_ = TAO_DEF_GIOP_MINOR;
+ qd->msg_type_ = this->message_type ();
+ }
+ else
+ {
+ // @@todo: Need to abstract this out to a seperate method...
+ size_t copy_len = qd->missing_data_;
+
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+
+ // Copy the right amount of data in to the node...
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ qd->msg_block_->rd_ptr (copy_len);
+
+ }
+
+ return 0;
+}
+
+
+void
+TAO_GIOP_Message_Lite::get_message_data (TAO_Queued_Data *qd)
+{
+ // Get the message information
+ qd->byte_order_ =
+ ACE_CDR_BYTE_ORDER;
+ qd->major_version_ =
+ TAO_DEF_GIOP_MAJOR;
+ qd->minor_version_ =
+ TAO_DEF_GIOP_MINOR;
+
+ qd->msg_type_=
+ this->message_type ();
+
+ this->reset ();
+}
+
+int
+TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/,
+ const TAO_Queued_Data */*sqd*/)
+{
+ // We dont know what fragments are???
+ return -1;
+}
+
+int
+TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
+ TAO_Queued_Data *qd)
+{
+ // Set the upcall thread
+ this->orb_core_->lf_strategy ().set_upcall_thread (
+ this->orb_core_->leader_follower ());
+
+ // A buffer that we will use to initialise the CDR stream
+ char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
+
+#if defined(ACE_HAS_PURIFY)
+ (void) ACE_OS::memset (repbuf,
+ '\0',
+ sizeof repbuf);
+#endif /* ACE_HAS_PURIFY */
+
+ // Initialze an output CDR on the stack
+ TAO_OutputCDR output (repbuf,
+ sizeof repbuf,
+ TAO_ENCAP_BYTE_ORDER,
+ this->orb_core_->output_cdr_buffer_allocator (),
+ this->orb_core_->output_cdr_dblock_allocator (),
+ this->orb_core_->output_cdr_msgblock_allocator (),
+ this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
+ qd->major_version_,
+ qd->minor_version_,
+ this->orb_core_->to_iso8859 (),
+ this->orb_core_->to_unicode ());
+
+ // Get the read and write positions before we steal data.
+ size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
+ size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ rd_pos += TAO_GIOP_LITE_HEADER_LEN;
+
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ qd->msg_block_->rd_ptr ()),
+ qd->msg_block_->length ());
+
+
+ // Create a input CDR stream.
+ // NOTE: We use the same data block in which we read the message and
+ // we pass it on to the higher layers of the ORB. So we dont to any
+ // copies at all here. The same is also done in the higher layers.
+
+ TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
+ ACE_Message_Block::DONT_DELETE,
+ rd_pos,
+ wr_pos,
+ qd->byte_order_,
+ qd->major_version_,
+ qd->minor_version_,
+ this->orb_core_);
- // Reset the message state. Now, we are ready for the next nested
- // upcall if any.
- this->message_state_.reset (0);
// We know we have some request message. Check whether it is a
// GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
- switch (this->message_state_.message_type)
+ switch (qd->msg_type_)
{
- case TAO_GIOP_REQUEST:
+ case TAO_PLUGGABLE_MESSAGE_REQUEST:
// Should be taken care by the state specific invocations. They
// could raise an exception or write things in the output CDR
// stream
return this->process_request (transport,
- orb_core,
- input_cdr);
- case TAO_GIOP_LOCATEREQUEST:
+ input_cdr,
+ output);
+ case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST:
return this->process_locate_request (transport,
- orb_core,
- input_cdr);
+ input_cdr,
+ output);
default:
return -1;
}
@@ -480,22 +554,59 @@ TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
int
TAO_GIOP_Message_Lite::process_reply_message (
- TAO_Pluggable_Reply_Params &params,
- TAO_Queued_Data * /* qd */
+ TAO_Pluggable_Reply_Params &params,
+ TAO_Queued_Data *qd
)
{
+
+
+ // Get the read and write positions before we steal data.
+ size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
+ size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ rd_pos += TAO_GIOP_LITE_HEADER_LEN;
+
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ qd->msg_block_->rd_ptr ()),
+ qd->msg_block_->length ());
+
+
+ // Create a empty buffer on stack
+ // NOTE: We use the same data block in which we read the message and
+ // we pass it on to the higher layers of the ORB. So we dont to any
+ // copies at all here. The same is alos done in the higher layers.
+ TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
+ ACE_Message_Block::DONT_DELETE,
+ rd_pos,
+ wr_pos,
+ qd->byte_order_,
+ qd->major_version_,
+ qd->minor_version_,
+ this->orb_core_);
+
+ // Reset the message state. Now, we are ready for the next nested
+ // upcall if any.
+ // this->message_handler_.reset (0);
+
+ // We know we have some reply message. Check whether it is a
+ // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
+
+ // Once we send the InputCDR stream we need to just forget about
+ // the stream and never touch that again for anything. We basically
+ // loose ownership of the data_block.
+
// We know we have some reply message. Check whether it is a
// GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
- switch (this->message_state_.message_type)
+ switch (qd->msg_type_)
{
case TAO_GIOP_REPLY:
// Should be taken care by the state specific parsing
- return this->parse_reply (this->input_cdr_,
+ return this->parse_reply (input_cdr,
params);
case TAO_GIOP_LOCATEREPLY:
// We call parse_reply () here because, the message format for
// the LOCATEREPLY & REPLY are same.
- return this->parse_reply (this->input_cdr_,
+ return this->parse_reply (input_cdr,
params);
default:
return -1;
@@ -569,16 +680,16 @@ TAO_GIOP_Message_Lite::write_protocol_header (
int
TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
- TAO_ORB_Core *orb_core,
- TAO_InputCDR &cdr)
+ TAO_InputCDR &cdr,
+ TAO_OutputCDR &output)
{
// This will extract the request header, set <response_required>
// and <sync_with_server> as appropriate.
TAO_ServerRequest request (this,
cdr,
- *this->output_,
+ output,
transport,
- orb_core);
+ this->orb_core_);
CORBA::Environment &ACE_TRY_ENV = TAO_default_environment ();
@@ -603,16 +714,16 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
CORBA::Object_var forward_to;
// Do this before the reply is sent.
- orb_core->adapter_registry ()->dispatch (request.object_key (),
- request,
- forward_to,
- ACE_TRY_ENV);
+ this->orb_core_->adapter_registry ()->dispatch (request.object_key (),
+ request,
+ forward_to,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
if (!CORBA::is_nil (forward_to.in ()))
{
// We should forward to another object...
- TAO_Pluggable_Reply_Params reply_params (orb_core);
+ TAO_Pluggable_Reply_Params reply_params (this->orb_core_);
reply_params.request_id_ = request_id;
reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD;
reply_params.svc_ctx_.length (0);
@@ -621,12 +732,12 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
reply_params.service_context_notowned (&request.reply_service_info ());
// Make the GIOP header and Reply header
- this->generate_reply_header (*this->output_,
+ this->generate_reply_header (output,
reply_params);
- *this->output_ << forward_to.in ();
+ output << forward_to.in ();
- int result = transport->send_message (*this->output_);
+ int result = transport->send_message (output);
if (result == -1)
{
if (TAO_debug_level > 0)
@@ -650,7 +761,7 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
if (response_required)
{
result = this->send_reply_exception (transport,
- orb_core,
+ this->orb_core_,
request_id,
&request.reply_service_info (),
&ACE_ANY_EXCEPTION);
@@ -709,7 +820,7 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
);
result = this->send_reply_exception (transport,
- orb_core,
+ this->orb_core_,
request_id,
&request.reply_service_info (),
&exception);
@@ -748,13 +859,13 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
int
TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
- TAO_ORB_Core* orb_core,
- TAO_InputCDR &input)
+ TAO_InputCDR &input,
+ TAO_OutputCDR &output)
{
// This will extract the request header, set <response_required> as
// appropriate.
TAO_GIOP_Locate_Request_Header locate_request (input,
- orb_core);
+ this->orb_core_);
TAO_GIOP_Locate_Status_Msg status_info;
@@ -802,7 +913,7 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
"_non_existent",
dummy_output,
transport,
- orb_core,
+ this->orb_core_,
parse_error);
if (parse_error != 0)
@@ -813,10 +924,10 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
CORBA::Object_var forward_to;
- orb_core->adapter_registry ()->dispatch (server_request.object_key (),
- server_request,
- forward_to,
- ACE_TRY_ENV);
+ this->orb_core_->adapter_registry ()->dispatch (server_request.object_key (),
+ server_request,
+ forward_to,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
if (!CORBA::is_nil (forward_to.in ()))
@@ -877,6 +988,7 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
ACE_ENDTRY;
return this->make_send_locate_reply (transport,
+ output,
locate_request,
status_info);
}
@@ -885,6 +997,7 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
int
TAO_GIOP_Message_Lite::make_send_locate_reply (
TAO_Transport *transport,
+ TAO_OutputCDR &output,
TAO_GIOP_Locate_Request_Header &request,
TAO_GIOP_Locate_Status_Msg &status_info
)
@@ -893,15 +1006,15 @@ TAO_GIOP_Message_Lite::make_send_locate_reply (
// different from the reply header made by the make_reply () call..
// Make the GIOP message header
this->write_protocol_header (TAO_GIOP_LOCATEREPLY,
- *this->output_);
+ output);
// This writes the header & body
- this->write_locate_reply_mesg (*this->output_,
+ this->write_locate_reply_mesg (output,
request.request_id (),
status_info);
// Send the message
- int result = transport->send_message (*this->output_);
+ int result = transport->send_message (output);
// Print out message if there is an error
if (result == -1)
@@ -1454,6 +1567,38 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label,
}
}
+TAO_Queued_Data *
+TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
+{
+ // Get a node for the queue..
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::get_queued_data ();
+
+ // Make a datablock for the size requested + something. The
+ // "something" is required because we are going to align the data
+ // block in the message block. During alignment we could loose some
+ // bytes. As we may not know how many bytes will be lost, we will
+ // allocate ACE_CDR::MAX_ALIGNMENT extra.
+ ACE_Data_Block *db =
+ this->orb_core_->data_block_for_message_block (sz +
+ ACE_CDR::MAX_ALIGNMENT);
+
+ ACE_Allocator *alloc =
+ this->orb_core_->message_block_msgblock_allocator ();
+
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ ACE_Message_Block *new_mb = mb.duplicate ();
+
+ ACE_CDR::mb_align (new_mb);
+
+ qd->msg_block_ = new_mb;
+
+ return qd;
+}
+
int
TAO_GIOP_Message_Lite::generate_locate_reply_header (
TAO_OutputCDR & /*cdr*/,
diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h
index 6086a4b11f3..8bd5202ca70 100644
--- a/TAO/tao/GIOP_Message_Lite.h
+++ b/TAO/tao/GIOP_Message_Lite.h
@@ -28,6 +28,7 @@
class TAO_Operation_Details;
class TAO_Pluggable_Reply_Params;
class TAO_GIOP_Locate_Request_Header;
+class TAO_Queued_Data;
/**
* @class TAO_GIOP_Message_Lite
@@ -53,7 +54,7 @@ public:
virtual void init (CORBA::Octet, CORBA::Octet);
/// Reset the messaging the object
- virtual void reset (int reset_flag = 1);
+ virtual void reset (void);
/// Write the RequestHeader in to the <cdr> stream. The underlying
/// implementation of the mesaging should do the right thing.
@@ -73,13 +74,7 @@ public:
TAO_Pluggable_Reply_Params_Base &params
);
- /// This method reads the message on the connection. Returns 0 when
- /// there is short read on the connection. Returns 1 when the full
- /// message is read and handled. Returns -1 on errors. If <block> is
- /// 1, then reply is read in a blocking manner. <bytes> indicates the
- /// number of bytes that needs to be read from the connection.
- /// GIOP uses this read to unmarshall the message details that appear
- /// on the connection.
+ /// Dummy method to ..
virtual int read_message (TAO_Transport *transport,
int block = 0,
ACE_Time_Value *max_wait_time = 0);
@@ -90,6 +85,8 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
+ /// Parse the incoming messages..
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block);
/// Get the message type. The return value would be one of the
/// following:
@@ -97,14 +94,35 @@ public:
/// TAO_PLUGGABLE_MESSAGE_REPLY,
/// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- virtual TAO_Pluggable_Message_Type message_type (void);
+ TAO_Pluggable_Message_Type message_type (void);
+ /// Calculate the amount of data that is missing in the <incoming>
+ /// message block.
+ virtual ssize_t missing_data (ACE_Message_Block &message_block);
+
+ /* Extract the details of the next message from the <incoming>
+ * through <qd>. Returns 1 if there are more messages and returns a
+ * 0 if there are no more messages in <incoming>.
+ */
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd);
+
+ /// Check whether the node <qd> needs consolidation from <incoming>
+ virtual int consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming);
+
+ /// Get the details of the message parsed through the <qd>.
+ virtual void get_message_data (TAO_Queued_Data *qd);
+
+ /// @@Bala: Docu???
+ virtual int consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd);
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core);
+ TAO_Queued_Data *qd);
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
@@ -129,41 +147,26 @@ private:
/// Processes the <GIOP_REQUEST> messages
int process_request (TAO_Transport *transport,
- TAO_ORB_Core *orb_core,
- TAO_InputCDR &input);
+ TAO_InputCDR &input,
+ TAO_OutputCDR &output);
/// Processes the <GIOP_LOCATE_REQUEST> messages
int process_locate_request (TAO_Transport *transport,
- TAO_ORB_Core *orb_core,
- TAO_InputCDR &input);
+ TAO_InputCDR &input,
+ TAO_OutputCDR &output);
/// Make a <GIOP_LOCATEREPLY> and hand that over to the transport so
/// that it can be sent over the connection.
/// NOTE:As on date 1.1 & 1.2 seem to have similar headers. Till an
/// unmanageable difference comes let them be implemented here.
int make_send_locate_reply (TAO_Transport *transport,
+ TAO_OutputCDR &output,
TAO_GIOP_Locate_Request_Header &request,
TAO_GIOP_Locate_Status_Msg &status);
/// Send error messages
int send_error (TAO_Transport *transport);
- /// Parses the header of the GIOP messages for validity
- int parse_header (void);
-
- /// Validates the first 4 bytes that contain the magic word
- /// "GIOP". Also calls the validate_version () on the incoming
- /// stream.
- int parse_magic_bytes (void);
-
- /// This will do a validation of the stream that arrive in the
- /// transport.
- int validate_version (void);
-
- /// Set the state
- void set_state (CORBA::Octet major,
- CORBA::Octet minor);
-
/// Close a connection, first sending GIOP::CloseConnection.
void send_close_connection (const TAO_GIOP_Message_Version &version,
TAO_Transport *transport,
@@ -182,6 +185,8 @@ private:
const u_char *ptr,
size_t len);
+ TAO_Queued_Data *make_queued_data (size_t sz);
+
/// Write the locate reply header
virtual int generate_locate_reply_header (
TAO_OutputCDR & /*cdr*/,
@@ -238,33 +243,18 @@ private:
private:
- /// The message state. It represents the status of the messages that
- /// have been read from the connection.
- TAO_GIOP_Message_State message_state_;
-
- /// Output CDR
- TAO_OutputCDR *output_;
-
- /// Allocators for the output CDR that we hold. As we cannot rely on
- /// the resources from ORB Core we reserve our own resources. The
- /// reason that we cannot believe the ORB core is that, for a
- /// multi-threaded servers it dishes out resources cached in
- /// TSS. This would be dangerous as TSS gets destroyed before we
- /// would. So we have our own memory that we can rely on.
- /// Implementations of GIOP that we have
- ACE_Allocator *cdr_buffer_alloc_;
- ACE_Allocator *cdr_dblock_alloc_;
- ACE_Allocator *cdr_msgblock_alloc_;
-
- /// A buffer that we will use to initialise the CDR stream
- char repbuf_[ACE_CDR::DEFAULT_BUFSIZE];
-
- /// The InputCDR stream in which the incoming messages are
- /// read. This will be used to decode the messages.
- TAO_InputCDR input_cdr_;
-
- /// The offset of the write pointer of the input cdr stream
- size_t current_offset_;
+ /// Our copy of the ORB core...
+ TAO_ORB_Core *orb_core_;
+
+ /// The message type that we are going to process..
+ CORBA::Octet message_type_;
+
+ /// The pay load size
+ CORBA::ULong message_size_;
+
+ // The byte order..
+ // NOTE: GIOP lite cannot work between heterogenous platforms..
+ CORBA::Octet byte_order_;
};
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index d3c664db9af..85e5f4b1d84 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -16,7 +16,7 @@
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/GIOP_Message_Base.h"
-//#include "tao/GIOP_Message_Lite.h"
+#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "tao/IIOP_Transport.i"
@@ -26,13 +26,12 @@ ACE_RCSID (tao, IIOP_Transport, "$Id$")
TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean /*flag*/)
+ CORBA::Boolean flag)
: TAO_Transport (TAO_TAG_IIOP_PROFILE,
orb_core)
, connection_handler_ (handler)
, messaging_object_ (0)
{
-#if 0
if (flag)
{
// Use the lite version of the protocol
@@ -40,7 +39,6 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
TAO_GIOP_Message_Lite (orb_core));
}
else
-#endif
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 17f8b31d6b9..737e329ea7b 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -134,6 +134,7 @@ TAO_Queued_Data::TAO_Queued_Data (void)
byte_order_ (0),
major_version_ (0),
minor_version_ (0),
+ more_fragments_ (0),
msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
next_ (0)
{
@@ -145,7 +146,20 @@ TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb)
byte_order_ (0),
major_version_ (0),
minor_version_ (0),
+ more_fragments_ (0),
msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
next_ (0)
{
}
+
+TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
+ : msg_block_ (qd.msg_block_->duplicate ()),
+ missing_data_ (qd.missing_data_),
+ byte_order_ (qd.byte_order_),
+ major_version_ (qd.major_version_),
+ minor_version_ (qd.minor_version_),
+ more_fragments_ (qd.more_fragments_),
+ msg_type_ (qd.msg_type_),
+ next_ (0)
+{
+}
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index 20fbdc17d04..6037ca825af 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -78,6 +78,10 @@ public:
int is_tail_complete (void);
int is_head_complete (void);
+ /// This method checks whether the last message that was queued up
+ /// was fragmented...
+ int is_tail_fragmented (void);
+
/// Return the size of data that is missing in tail of the queue.
size_t missing_data_tail (void) const;
/// void missing_data (size_t data);
@@ -123,11 +127,18 @@ public:
/// Constructor.
TAO_Queued_Data (ACE_Message_Block *mb);
+ /// Copy constructor.
+ TAO_Queued_Data (const TAO_Queued_Data &qd);
+
/// Creation and deletion of a node in the queue.
static TAO_Queued_Data* get_queued_data (void);
static void release (TAO_Queued_Data *qd);
+ /// Duplicate ourselves. This creates a copy of ourselves on the
+ /// heap and returns a pointer to the duplicated node.
+ static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd);
+
/// The message block that contains the message.
ACE_Message_Block *msg_block_;
@@ -142,9 +153,14 @@ public:
/// information that would be needed to read and decipher the
/// message.
CORBA::Octet major_version_;
-
CORBA::Octet minor_version_;
+ /// Some messages can be fragmented by the protocol (this is an ORB
+ /// level fragmentation on top of the TCP/IP fragmentation. This
+ /// member indicates whether the message that we have recd. and
+ /// queue already has more fragments that is missing..
+ CORBA::Octet more_fragments_;
+
/// The message type of the message
TAO_Pluggable_Message_Type msg_type_;
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index 13ee962af2f..df61432d461 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -3,6 +3,7 @@
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
+/*static*/
ACE_INLINE TAO_Queued_Data *
TAO_Queued_Data::get_queued_data (void)
{
@@ -15,6 +16,7 @@ TAO_Queued_Data::get_queued_data (void)
return qd;
}
+/*static*/
ACE_INLINE void
TAO_Queued_Data::release (TAO_Queued_Data *qd)
{
@@ -24,6 +26,23 @@ TAO_Queued_Data::release (TAO_Queued_Data *qd)
delete qd;
}
+/*static*/
+ACE_INLINE TAO_Queued_Data *
+TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
+{
+ // @@TODO: Use the global pool for allocationg...
+ TAO_Queued_Data *qd = 0;
+ ACE_NEW_RETURN (qd,
+ TAO_Queued_Data (sqd),
+ 0);
+
+ return qd;
+}
+
+/************************************************************************/
+// Methods for TAO_Incoming_Message_Queue
+/************************************************************************/
+
ACE_INLINE CORBA::ULong
TAO_Incoming_Message_Queue::queue_length (void)
{
@@ -38,7 +57,8 @@ TAO_Incoming_Message_Queue::is_tail_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->missing_data_ == 0)
+ this->queued_data_->missing_data_ == 0 &&
+ this->queued_data_->more_fragments_ == 0)
return 1;
return 0;
@@ -51,7 +71,21 @@ TAO_Incoming_Message_Queue::is_head_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->next_->missing_data_ == 0)
+ this->queued_data_->next_->missing_data_ == 0 &&
+ this->queued_data_->more_fragments_ == 0)
+ return 1;
+
+ return 0;
+}
+
+ACE_INLINE int
+TAO_Incoming_Message_Queue::is_tail_fragmented (void)
+{
+ if (this->size_ == 0)
+ return 0;
+
+ if (this->size_ &&
+ this->queued_data_->more_fragments_ == 0)
return 1;
return 0;
diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile
index 61d72f65cf9..9b9dd8ea637 100644
--- a/TAO/tao/Makefile
+++ b/TAO/tao/Makefile
@@ -108,6 +108,7 @@ PLUGGABLE_MESSAGING_FILES = \
Pluggable_Messaging \
Pluggable_Messaging_Utils \
GIOP_Message_Base \
+ GIOP_Message_Lite \
GIOP_Message_Generator_Parser \
GIOP_Message_Generator_Parser_10 \
GIOP_Message_Generator_Parser_11 \
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index db21cc55df8..dd7ff672518 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -134,6 +134,10 @@ public:
virtual int consolidate_node (TAO_Queued_Data *qd,
ACE_Message_Block &incoming) = 0;
+ /// @@Bala:Docu??
+ virtual int consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd) = 0;
+
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
@@ -158,7 +162,7 @@ public:
virtual int is_ready_for_bidirectional (void) = 0;
/// Reset the messaging the object
- virtual void reset (int reset_flag = 1) = 0;
+ virtual void reset (void) = 0;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index 8ed6aa9b4a6..eef853367fb 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -20,7 +20,7 @@
#include "tao/debug.h"
#include "tao/Resume_Handle.h"
#include "tao/GIOP_Message_Base.h"
-// #include "tao/GIOP_Message_Lite.h"
+#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "DIOP_Transport.i"
@@ -30,7 +30,7 @@ ACE_RCSID (tao, DIOP_Transport, "$Id$")
TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean /*flag*/)
+ CORBA::Boolean flag)
: TAO_Transport (TAO_TAG_UDP_PROFILE,
orb_core)
, connection_handler_ (handler)
@@ -38,10 +38,20 @@ TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
{
// @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that
// we read the whole UDP packet on a single read.
-
- ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core,
- ACE_MAX_DGRAM_SIZE));
+ if (flag)
+ {
+ // Use the lite version of the protocol
+ ACE_NEW (this->messaging_object_,
+ TAO_GIOP_Message_Lite (orb_core,
+ ACE_MAX_DGRAM_SIZE));
+ }
+ else
+ {
+ // Use the normal GIOP object
+ ACE_NEW (this->messaging_object_,
+ TAO_GIOP_Message_Base (orb_core,
+ ACE_MAX_DGRAM_SIZE));
+ }
}
TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index 0b333e0c566..6284a64ad71 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -17,6 +17,7 @@
#include "tao/debug.h"
#include "tao/Resume_Handle.h"
#include "tao/GIOP_Message_Base.h"
+#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
@@ -27,13 +28,12 @@ ACE_RCSID (Strategies, SHMIOP_Transport, "$Id$")
TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean /*flag*/)
+ CORBA::Boolean flag)
: TAO_Transport (TAO_TAG_SHMEM_PROFILE,
orb_core),
connection_handler_ (handler),
messaging_object_ (0)
{
-#if 0
if (flag)
{
// Use the lite version of the protocol
@@ -41,7 +41,6 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl
TAO_GIOP_Message_Lite (orb_core));
}
else
-#endif /*#if 0 */
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp
index 5577150de4a..2baf994c1ac 100644
--- a/TAO/tao/Strategies/UIOP_Transport.cpp
+++ b/TAO/tao/Strategies/UIOP_Transport.cpp
@@ -16,7 +16,7 @@
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/GIOP_Message_Base.h"
-//#include "tao/GIOP_Message_Lite.h"
+#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "UIOP_Transport.i"
@@ -27,19 +27,19 @@ ACE_RCSID (Strategies, UIOP_Transport, "$Id$")
TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean /*flag*/)
+ CORBA::Boolean flag)
: TAO_Transport (TAO_TAG_UIOP_PROFILE,
orb_core)
, connection_handler_ (handler)
, messaging_object_ (0)
{
- /* if (flag)
+ if (flag)
{
// Use the lite version of the protocol
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Lite (orb_core));
}
- else*/
+ else
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp
index 62572777b23..b319103ab57 100644
--- a/TAO/tao/TAO.dsp
+++ b/TAO/tao/TAO.dsp
@@ -411,6 +411,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.cpp
# End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Lite.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Locate_Header.cpp
# End Source File
# Begin Source File
@@ -1143,6 +1147,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.h
# End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Lite.h
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Locate_Header.h
# End Source File
# Begin Source File
@@ -1859,6 +1867,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.inl
# End Source File
# Begin Source File
+SOURCE=.\GIOP_Message_Lite.i
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Locate_Header.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index a91348edf7a..09212a06a5b 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -2,6 +2,7 @@
// $Id$
+
#include "Transport.h"
#include "Exception.h"
@@ -888,6 +889,23 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
// Extract the data for the node..
this->messaging_object ()->get_message_data (&qd);
+ // Check whether the message was fragmented..
+ if (qd.more_fragments_ ||
+ (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Make a copy of the message that we have
+ ACE_Data_Block *ndb =
+ message_block.data_block ()->clone ();
+
+ // Replace the underlying the datablock
+ message_block.replace_data_block (ndb);
+
+ // Duplicate the node that we have as the node is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
+
+ return this->consolidate_fragments (nqd, rh);
+ }
+
// Resume before starting to process the request..
rh.resume_handle ();
@@ -908,6 +926,7 @@ TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
// Check whether we have a complete message for processing
ssize_t missing_data = this->missing_data (block);
+
if (missing_data < 0)
{
// If we have more than one message
@@ -1070,6 +1089,17 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
pqd.missing_data_ = missing_data;
this->messaging_object ()->get_message_data (&pqd);
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (pqd.more_fragments_ ||
+ (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Duplicate the queued data as it is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
+
+ return this->consolidate_fragments (nqd, rh);
+ }
+
// Resume the handle before processing the request
rh.resume_handle ();
@@ -1079,6 +1109,45 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
}
int
+TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh)
+{
+ // If we have received a fragment message then we have to
+ // consolidate <qd> with the last message in queue
+ // @@todo: this piece of logic follows GIOP a bit... Need to revisit
+ // if we have protocols other than GIOP
+
+ // @@todo: Fragments now have too much copying overhead. Need to get
+ // them out if we want to have some reasonable performance metrics
+ // in future.. Post 1.2 seems a nice time..
+ if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ TAO_Queued_Data *tqd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ tqd->more_fragments_ = qd->more_fragments_;
+
+ if (this->messaging_object ()->consolidate_fragments (tqd,
+ qd) == -1)
+ return -1;
+
+ TAO_Queued_Data::release (qd);
+
+ this->incoming_message_queue_.enqueue_tail (tqd);
+
+ this->process_queue_head (rh);
+ }
+ else
+ {
+ // if we dont have a fragment already in the queue just add it in
+ // the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+ }
+
+ return 0;
+}
+
+int
TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
ssize_t missing_data,
TAO_Resume_Handle &rh,
@@ -1241,7 +1310,18 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
this->messaging_object ()->extract_next_message (incoming,
q_data);
if (q_data)
- this->incoming_message_queue_.enqueue_tail (q_data);
+ {
+ // If we have read a framented message then...
+ if (q_data->more_fragments_ ||
+ q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ this->consolidate_fragments (q_data, rh);
+ }
+ else
+ {
+ this->incoming_message_queue_.enqueue_tail (q_data);
+ }
+ }
}
// In case of error return..
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 009ce9025ed..6405bbd50f8 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -636,6 +636,12 @@ protected:
TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time);
+ /// @@Bala: Docu???
+ int consolidate_fragments (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh);
+
+
+
/// First consolidate the message queue. If the message is still not
/// complete, try to read from the handle again to make it
/// complete. If these dont help put the message back in the queue
diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl
index b53fc80a7d7..06411ff3179 100644
--- a/TAO/tao/Transport.inl
+++ b/TAO/tao/Transport.inl
@@ -1,3 +1,4 @@
+// -*- C++ -*-
// $Id$
ACE_INLINE CORBA::ULong
@@ -83,7 +84,7 @@ TAO_Transport::purging_order (void) const
{
return this->purging_order_;
}
-
+
ACE_INLINE void
TAO_Transport::purging_order (unsigned long value)
{
diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h
index 12336939a76..52d26fc85f9 100644
--- a/TAO/tao/orbconf.h
+++ b/TAO/tao/orbconf.h
@@ -279,6 +279,11 @@
// Define if your processor does not store words with the most significant
// byte first.
+
+// @todo: It seems to be that this definition of TAO_ENCAP_BYTE_ORDER
+// should be removed. We have an equivalent ACE definition in
+// ACE_CDR_BYTE_ORDER. Today both of them are consistent. It would be
+// a havoc if oneday this consistency is gone..
#if defined (ACE_LITTLE_ENDIAN)
# define TAO_ENCAP_BYTE_ORDER 1 /* little endian encapsulation byte order has
the value = 1 */
@@ -928,6 +933,14 @@ enum TAO_Policy_Scope
#define TAO_DEF_GIOP_MINOR 2
#endif /* TAO_DEF_GIOP_MINOR */
+#if !defined (TAO_CONNECTION_HANDLER_STACK_BUF_SIZE)
+# define TAO_CONNECTION_HANDLER_STACK_BUF_SIZE 1024
+#endif /*TAO_CONNECTION_HANDLER_STACK_BUF_SIZE */
+
+#if !defined (TAO_RESUMES_CONNECTION_HANDLER)
+# define TAO_RESUMES_CONNECTION_HANDLER 1
+#endif /*TAO_RESUMES_CONNECTION_HANDLER*/
+
// By default TAO generate the OMG standard profile components
// (ORB_TYPE and CODE_SETS)
#define TAO_STD_PROFILE_COMPONENTS