summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-06-17 23:11:02 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-06-17 23:11:02 +0000
commit6ceb20f11b609d45c6bc5687808178b4bd62e826 (patch)
treeb0d8856c3506c1bdc0e893e1766fc9187f9cc8ff
parent735b2e55f740e7183aad0e4877e5b68f1bbe5845 (diff)
downloadATCD-6ceb20f11b609d45c6bc5687808178b4bd62e826.tar.gz
ChangeLogTag:Sat Jun 17 17:46:23 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a73
-rw-r--r--TAO/tao/Asynch_Reply_Dispatcher.cpp12
-rw-r--r--TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp5
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp196
-rw-r--r--TAO/tao/GIOP_Message_Base.h16
-rw-r--r--TAO/tao/GIOP_Message_State.cpp2
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp119
-rw-r--r--TAO/tao/Incoming_Message_Queue.h21
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl35
-rw-r--r--TAO/tao/LIST_OF_TODO8
-rw-r--r--TAO/tao/ORB.cpp8
-rw-r--r--TAO/tao/ORB_Core.cpp1
-rw-r--r--TAO/tao/ORB_Core.h3
-rw-r--r--TAO/tao/ORB_Core.i9
-rw-r--r--TAO/tao/Pluggable_Messaging.h10
-rw-r--r--TAO/tao/Synch_Reply_Dispatcher.cpp23
-rw-r--r--TAO/tao/Transport.cpp264
-rw-r--r--TAO/tao/Transport.h32
18 files changed, 509 insertions, 328 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 41695602f91..cff70c321ef 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,76 @@
+Sat Jun 17 17:46:23 2001 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ This set of changes comes with complete revamping of the previous
+ design. The flaws with the previous design were as follows
+ (1) We were unnecessarily penalising large data blocks. We were
+ trying to read a particular size of data till the data was
+ completely removed from the socket. This was totally
+ ridiculous because we were doing more reads than required.
+
+ (2) The message block that was constructed on the stack with a
+ buffer from stack never did what we wanted. It was allocating
+ a data block on the heap and was thus spoiling whatever
+ optimization that we had tried putting in.
+
+ (3) The incoming message Queue is now managed by the TAO_Transport
+ object instead of the GIOP classes.
+
+ * tao/GIOP_Message_Base.cpp:
+ * tao/GIOP_Message_Base.h: Removed the references to the incoming
+ message queue. Implemention for two methods missing_data () and
+ byte_order (). Added an extra argument to the methods
+ process_request_message () and process_reply_message (). Used
+ the incoming message block to create a input CDR with the
+ DONT_DELETE flag so that the data block is not deleted after
+ request processing.
+
+ * tao/GIOP_Message_State.cpp: Removed the inclusion of
+ Transport.h.
+
+ * tao/Incoming_Message_Queue.h:
+ * tao/Incoming_Message_Queue.cpp:
+ * tao/Incoming_Message_Queue.inl: Added an argument to the
+ add_message (). Further the implementation of add_message () has
+ changed a bit. It now adds only a new message to the queue. It
+ doesn't modify a half filled queue. The TAO_Transport object
+ does that job. So declared the TAO_Transport as the friend class
+ of the Incoming_Message_Queue.
+
+ Changed the name of the methods complete_message () as
+ is_complete_message (). Removed the methods current_message ()&
+ current_byte_order ().
+
+ Added a new method copy_message () which copies messages into
+ the half empty nodes.
+
+ Added a method wr_ptr () to access the write pointer of the tail
+ node that has halfempty message.
+
+ * tao/Transport.cpp:
+ * tao/Transport.h: The Incoming Message Queue is now managed by
+ this class. Added the following methods
+ - missing_data ()
+ - parse_incoming_messages ()
+ - check_message_integrity ()
+ - consolidate_message ()
+ - conslodate_message_queue ()
+
+ * tao/Pluggable_Messaging.h: Added two new virtual functions
+ missing_data () and byte_order ().
+
+ * tao/ORB_Core.h:
+ * tao/ORB_Core.cpp:
+ * tao/ORB_Core.i: Added an accessor for the locking_strategy used
+ for the CDR blocks.
+
+ * tao/Synch_Reply_Dispatcher.cpp:
+ * tao/Asynch_Reply_Dispatcher.cpp:
+ * tao/DynamicInterface/DII_Reply_Dispatcher.cpp: Changed the
+ exchange_data_block () to clone_from () which is a new method in
+ ACE_InputCDR.
+
+ * tao/LIST_OF_TODO: Updated the list
+
Sat Jun 16 15:49:23 2001 Balachandran Natarajan <bala@cs.wustl.edu>
* tao/Any.cpp:
diff --git a/TAO/tao/Asynch_Reply_Dispatcher.cpp b/TAO/tao/Asynch_Reply_Dispatcher.cpp
index b4a20a82253..110df977c28 100644
--- a/TAO/tao/Asynch_Reply_Dispatcher.cpp
+++ b/TAO/tao/Asynch_Reply_Dispatcher.cpp
@@ -48,11 +48,6 @@ TAO_Asynch_Reply_Dispatcher_Base::dispatch_reply (
return 0;
}
-/*TAO_GIOP_Message_State *
-TAO_Asynch_Reply_Dispatcher_Base::message_state (void)
-{
- return this->message_state_;
-} */
void
TAO_Asynch_Reply_Dispatcher_Base::dispatcher_bound (TAO_Transport *)
@@ -113,10 +108,9 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply (
this->reply_status_ = params.reply_status_;
- // this->message_state_ = message_state;
-
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.exchange_data_blocks (params.input_cdr_);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
// Steal the buffer, that way we don't do any unnecesary copies of
// this data.
diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
index 5471cd4d1f3..b4449b1deaf 100644
--- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
+++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
@@ -44,8 +44,9 @@ TAO_DII_Deferred_Reply_Dispatcher::dispatch_reply (
{
this->reply_status_ = params.reply_status_;
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.steal_from (params.input_cdr_);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
// Steal the buffer, that way we don't do any unnecesary copies of
// this data.
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index cbe8b53ea27..ab01a84ac98 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -21,7 +21,6 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
size_t /*input_cdr_size*/)
: message_state_ (orb_core,
this),
- message_queue_ (orb_core),
output_ (0),
generator_parser_ (0)
{
@@ -38,6 +37,8 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
orb_core->message_block_dblock_allocator (),
orb_core->message_block_msgblock_allocator (),
orb_core->orb_params ()->cdr_memcpy_tradeoff (),
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR,
orb_core->to_iso8859 (),
orb_core->to_unicode ()));
}
@@ -315,13 +316,7 @@ TAO_GIOP_Message_Base::message_type (void)
int
TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
{
- // If we have a queue and if the last message is not complete a
- // complete one, then this read will get us the remaining data. So
- // do not try to parse the header if we have an incomplete message
- // in the queue.
- if (this->message_queue_.queue_length () > 0 &&
- this->message_queue_.complete_message () == 0)
- return 0;
+
if (this->message_state_.parse_message_header (incoming) == -1)
{
@@ -334,48 +329,36 @@ TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
return 0;
}
-int
-TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block &incoming)
+size_t
+TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
{
- // If we have atleast one message in the message queue we just add
- // the message to the queue
- if (this->message_queue_.queue_length ())
- {
- // Add the new message to the Queue
- this->message_queue_.add_message (incoming,
- this->message_state_);
-
- // We could have gotten the rest of the message of a previous
- // incomplete read. We query to figure whether we have a
- // complete message...
- return this->message_queue_.complete_message ();
- }
-
- size_t len = incoming.length ();
-
- len -= TAO_GIOP_MESSAGE_HEADER_LEN;
-
+ // @@Bala: Look for fragmentation here..
// If we had recd. fragmented messages and if the GIOP minor version
// is greater than 1, then include the FRAGMENT HEADER to calculate
// the effective length of the message
- if (this->message_state_.more_fragments_ &&
+ /*if (this->message_state_.more_fragments_ &&
this->message_state_.giop_version_.minor > 1)
- len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+ len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+ */
+
+ size_t len = incoming.length ();
+
+ if (len >= this->message_state_.message_size ())
+ return 0;
+
+ return this->message_state_.message_size () - len;
+}
+
+CORBA::Octet
+TAO_GIOP_Message_Base::byte_order (void)
+{
+ return this->message_state_.byte_order_;
+}
+
+int
+TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block &incoming)
+{
- if (len == this->message_state_.message_size_)
- return 1;
- else
- {
- // If we have a bigger or smaller message we add the message to
- // the queue.
- this->message_queue_.add_message (incoming,
- this->message_state_);
-
- // We could have gotten the rest of the message of a previous
- // incomplete read. We query to figure whether we have a
- // complete message...
- return this->message_queue_.complete_message ();
- }
// @@Bala: Implement other cases
return 0;
@@ -384,7 +367,8 @@ TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block &incoming)
int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_ORB_Core *orb_core,
- ACE_Message_Block &incoming)
+ ACE_Message_Block &incoming,
+ CORBA::Octet byte_order)
{
// Set the upcall thread
orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ());
@@ -393,64 +377,26 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
// @@@@Is it necessary here?
this->output_->reset ();
- CORBA::Octet byte_order = 0;
- size_t rd_pos = 0;
- size_t wr_pos = 0;
- ACE_Data_Block *data = 0;
- size_t len = 0;
- char *ptr = 0;
- // At this point we have data in the queue or in the <incoming>
- // message block. If we have data in the queue we process that
- // first. At this point we are just assuming that (and it is a
- // pretty good assumption) that the data in <incoming> has already
- // been copied.
- if (this->message_queue_.queue_length ())
- {
- // As we have a message in the queue let us process that. We do
- // that by taking the data block off the queue and sticking it
- // in the <incoming> message block..
- data =
- this->message_queue_.get_current_message (byte_order);
-
- ptr = data->base ();
- // Set the read and write pointer positions
- // @@ What do we if we get Fragmented messages?
- rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
- len = wr_pos = data->size ();
-
- }
- else
- {
- // Get the read and write positions before we steal data.
- // @@ Bala:Need to change this
- rd_pos = incoming.rd_ptr () - incoming.base ();
- wr_pos = incoming.wr_ptr () - incoming.base ();
- rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
- byte_order = this->message_state_.byte_order_;
-
- data = incoming.data_block ()->duplicate ();
-
- len = incoming.length ();
- ptr = incoming.rd_ptr ();
- // Duplicate the data block
- // @@Do we need to ??
- // ACE_Data_Block *data =
- //incoming.data_block ()->duplicate ();
- }
-
+ // Get the read and write positions before we steal data.
+ size_t rd_pos = incoming.rd_ptr () - incoming.base ();
+ size_t wr_pos = incoming.wr_ptr () - incoming.base ();
+ rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
this->dump_msg ("recv",
- ACE_reinterpret_cast (u_char *, ptr),
- len);
+ ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()),
+ incoming.length ());
// Create a input CDR stream.
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
// copies at all here. The same is also done in the higher layers.
- TAO_InputCDR input_cdr (data,
+ TAO_InputCDR input_cdr (incoming.data_block (),
+ ACE_Message_Block::DONT_DELETE,
rd_pos,
wr_pos,
byte_order,
+ this->message_state_.giop_version_.major,
+ this->message_state_.giop_version_.minor,
orb_core);
// Reset the message handler to receive upcalls if any
@@ -485,66 +431,32 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
int
TAO_GIOP_Message_Base::process_reply_message (
TAO_Pluggable_Reply_Params &params,
- ACE_Message_Block &incoming
+ ACE_Message_Block &incoming,
+ CORBA::Octet byte_order
)
{
- CORBA::Octet byte_order = 0;
- size_t rd_pos = 0;
- size_t wr_pos = 0;
- ACE_Data_Block *data = 0;
- size_t len = 0;
- char *ptr = 0;
- // At this point we have data in the queue or in the <incoming>
- // message block. If we have data in the queue we process that
- // first. At this point we are just assuming that (and it is a
- // pretty good assumption) that the data in <incoming> has already
- // been copied.
- if (this->message_queue_.queue_length ())
- {
- // As we have a message in the queue let us process that. We do
- // that by taking the data block off the queue and sticking it
- // in the <incoming> message block..
- data =
- this->message_queue_.get_current_message (byte_order);
-
- ptr = data->base ();
- // Set the read and write pointer positions
- // @@ What do we if we get Fragmented messages?
- rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
- len = wr_pos = data->size ();
- }
- else
- {
- // Get the read and write positions before we steal data.
- // @@ Bala:Need to change this
- rd_pos = incoming.rd_ptr () - incoming.base ();
- wr_pos = incoming.wr_ptr () - incoming.base ();
- rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
- byte_order = this->message_state_.byte_order_;
-
- data = incoming.data_block ()->duplicate ();
- len = incoming.length ();
- ptr = incoming.rd_ptr ();
- // Duplicate the data block
- // @@Do we need to ??
- // ACE_Data_Block *data =
- //incoming.data_block ()->duplicate ();
- }
+
+ // Get the read and write positions before we steal data.
+ size_t rd_pos = incoming.rd_ptr () - incoming.base ();
+ size_t wr_pos = incoming.wr_ptr () - incoming.base ();
+ rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
this->dump_msg ("recv",
- ACE_reinterpret_cast (u_char *, ptr),
- len);
+ ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()),
+ incoming.length ());
- // Create a input CDR stream.
+ // Create a empty buffer on stack
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
// copies at all here. The same is alos done in the higher layers.
- TAO_InputCDR input_cdr (data,
+ TAO_InputCDR input_cdr (incoming.data_block (),
+ ACE_Message_Block::DONT_DELETE,
rd_pos,
wr_pos,
+ this->message_state_.giop_version_.major,
+ this->message_state_.giop_version_.minor,
byte_order);
-
// Reset the message state. Now, we are ready for the next nested
// upcall if any.
// this->message_handler_.reset (0);
@@ -1205,6 +1117,8 @@ TAO_GIOP_Message_Base::send_reply_exception (
orb_core->output_cdr_dblock_allocator (),
orb_core->output_cdr_msgblock_allocator (),
orb_core->orb_params ()->cdr_memcpy_tradeoff (),
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR,
orb_core->to_iso8859 (),
orb_core->to_unicode ());
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index dc2f9cf12e7..1ca368005fe 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -24,7 +24,7 @@
#include "tao/GIOP_Message_Generator_Parser_Impl.h"
#include "tao/GIOP_Utils.h"
#include "tao/GIOP_Message_State.h"
-#include "tao/Incoming_Message_Queue.h"
+
class TAO_Pluggable_Reply_Params;
@@ -107,17 +107,24 @@ public:
/// @@Bala:Documentation please..
virtual int is_message_complete (ACE_Message_Block &message_block);
+ /// @@Bala:Documentation please..
+ virtual size_t missing_data (ACE_Message_Block &message_block);
+
+ virtual CORBA::Octet byte_order (void);
+
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
TAO_ORB_Core *orb_core,
- ACE_Message_Block &block);
+ ACE_Message_Block &block,
+ CORBA::Octet byte_order);
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
virtual int process_reply_message (
TAO_Pluggable_Reply_Params &reply_info,
- ACE_Message_Block &block);
+ ACE_Message_Block &block,
+ CORBA::Octet byte_order);
/// Generate a reply message with the exception <ex>.
@@ -200,9 +207,6 @@ private:
/// incoming messages
TAO_GIOP_Message_State message_state_;
- /// @@Bala:Docu
- TAO_Incoming_Message_Queue message_queue_;
-
/// Output CDR
TAO_OutputCDR *output_;
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index 9a2401216a4..3171f3de61b 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -6,7 +6,7 @@
#include "tao/Pluggable.h"
#include "tao/debug.h"
#include "tao/GIOP_Message_Base.h"
-#include "Transport.h"
+//#include "Transport.h"
#if !defined (__ACE_INLINE__)
# include "tao/GIOP_Message_State.inl"
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 13ecd6310f5..e5b239b775c 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -1,6 +1,6 @@
#include "Incoming_Message_Queue.h"
#include "ORB_Core.h"
-#include "GIOP_Message_State.h"
+#include "debug.h"
#if !defined (__ACE_INLINE__)
# include "Incoming_Message_Queue.inl"
@@ -22,98 +22,64 @@ TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void)
}
int
-TAO_Incoming_Message_Queue::add_message (const ACE_Message_Block &block,
- const TAO_GIOP_Message_State &state)
+TAO_Incoming_Message_Queue::add_message (ACE_Message_Block *incoming,
+ size_t missing_data,
+ CORBA::Octet byte_order)
+
{
+ // Allocate memory for TAO_Queued_Data
+ TAO_Queued_Data *qd = this->get_node ();
- // Check whether the last message in the Queue has a half message
- if (this->size_ > 0 &&
- this->queued_data_->missing_data_ > 0)
+ if (qd == 0)
{
- // Create the message block
- ACE_Message_Block mb (this->queued_data_->data_block_);
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Could not make a node \n")));
+ }
+ return -1;
+ }
+
+ // Set the data block
+ qd->msg_block_ = incoming;
- // Duplicate the message block so that the data block is not
- // deleted by the above message block
- this->queued_data_->data_block_->duplicate ();
+ // Set the byte_order
+ qd->byte_order_ = byte_order;
- // Set the write pointer in the message block so that we can
- // copy in the message block. To do that we need to calculate
- // the write pointer position.
- size_t wr_pos =
- this->queued_data_->data_block_->size () -
- this->queued_data_->missing_data_;
+ qd->missing_data_ = missing_data;
- mb.wr_ptr (wr_pos);
+ this->add_node (qd);
+
+ // increment the size of the list
+ ++this->size_;
+
+ return 1;
+}
- // If we have received more data than the missing data we copy
- // only the missing data. If we have received less then we copy
- // all the data. So we calculate how much of data to copy.
+void
+TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block)
+{
+ if (this->size_ > 0)
+ {
size_t n = 0;
- if (block.length () >
- this->queued_data_->missing_data_)
+ if (block.length () <= this->queued_data_->missing_data_)
{
- n = this->queued_data_->missing_data_;
+ n = block.length ();
}
else
{
- n = block.length ();
+ n = this->queued_data_->missing_data_;
}
- // Now do the copy of the missing data.
- mb.copy (block.rd_ptr (),
- n);
-
- // Now that we have copied n bytes, let us decrease the
- // <missing_data_> by n.
+ this->queued_data_->msg_block_->copy (block.rd_ptr (),
+ n);
this->queued_data_->missing_data_ -= n;
-
}
- else
- {
- // Create a data block of size of the incoming message
- // @@Bala: Hard coding??
- ACE_Data_Block *db =
- this->orb_core_->data_block_for_message_block (state.message_size ());
-
- // Make a message block with the above data_block
- ACE_Message_Block mb (db);
-
- // Increment the ref count so that the data block is not deleted
- // when the message block goes out of scope. This is necessary as we
- // have created the message block on the stack
- db->duplicate ();
-
- // Do a copy of the message in to the above data block
- mb.copy (block.rd_ptr (),
- block.length ());
-
- // Allocate memory for TAO_Queued_Data
- TAO_Queued_Data *qd = this->get_node ();
-
- // Set the data block
- qd->data_block_ = db;
-
- // Set the byte_order
- qd->byte_order_ = state.byte_order ();
-
- if (state.message_size () <= block.length ())
- qd->missing_data_ = 0;
- else
- qd->missing_data_ = state.message_size () - block.length ();
-
- this->add_node (qd);
-
- // increment the size of the list
- ++this->size_;
- }
-
- return 1;
}
-ACE_Data_Block *
-TAO_Incoming_Message_Queue::get_current_message (CORBA::Octet &byte_order)
+ACE_Message_Block *
+TAO_Incoming_Message_Queue::dequeue_head (CORBA::Octet &byte_order)
{
TAO_Queued_Data *tmp =
this->queued_data_->next_;
@@ -121,9 +87,8 @@ TAO_Incoming_Message_Queue::get_current_message (CORBA::Octet &byte_order)
if (tmp->missing_data_ != 0)
return 0;
- ACE_Data_Block *db =
- tmp->data_block_;
-
+ ACE_Message_Block *db =
+ tmp->msg_block_;
this->queued_data_->next_ = tmp->next_;
byte_order = tmp->byte_order_;
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index ef29b6ac7e1..ffe0f379e1d 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -23,8 +23,8 @@
/// Forward declarations
class ACE_Data_Block;
class TAO_ORB_Core;
-class TAO_GIOP_Message_State;
class TAO_Queued_Data;
+class TAO_Transport;
/**
* @class TAO_Incoming_Message_Queue
@@ -42,21 +42,26 @@ public:
/// @@Bala:Docu
- int add_message (const ACE_Message_Block &block,
- const TAO_GIOP_Message_State &state);
+ int add_message (ACE_Message_Block *block,
+ size_t missing_data,
+ CORBA::Octet byte_order);
- const ACE_Data_Block *current_message (void) const;
- const CORBA::Octet current_byte_order (void) const;
+ void copy_message (ACE_Message_Block &block);
CORBA::ULong queue_length (void);
- int complete_message (void);
+ int is_complete_message (void);
+ size_t missing_data (void) const;
+ void missing_data (size_t data);
- ACE_Data_Block *get_current_message (CORBA::Octet &byte_order);
+ char *wr_ptr (void) const;
+ ACE_Message_Block *dequeue_head (CORBA::Octet &byte_order);
private:
+ friend class TAO_Transport;
+
/// @@Bala:Docu
class TAO_Export TAO_Queued_Data
{
@@ -64,7 +69,7 @@ private:
TAO_Queued_Data (void);
/// The actual message queue
- ACE_Data_Block *data_block_;
+ ACE_Message_Block *msg_block_;
CORBA::ULong missing_data_;
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index 8bb81f3a247..10713dcf732 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -1,31 +1,32 @@
// -*- C++ -*-
//$Id$
-ACE_INLINE const ACE_Data_Block *
-TAO_Incoming_Message_Queue::current_message (void) const
+ACE_INLINE CORBA::ULong
+TAO_Incoming_Message_Queue::queue_length (void)
{
-
- return this->queued_data_->data_block_;
+ return this->size_;
}
-ACE_INLINE const CORBA::Octet
-TAO_Incoming_Message_Queue::current_byte_order (void) const
+ACE_INLINE int
+TAO_Incoming_Message_Queue::is_complete_message (void)
{
- return this->queued_data_->byte_order_;
-}
+ if (this->size_ != 0 &&
+ this->queued_data_->missing_data_ == 0)
+ return 0;
+ return 1;
+}
-ACE_INLINE CORBA::ULong
-TAO_Incoming_Message_Queue::queue_length (void)
+ACE_INLINE char *
+TAO_Incoming_Message_Queue::wr_ptr (void) const
{
- return this->size_;
+ return this->queued_data_->msg_block_->wr_ptr ();
}
-ACE_INLINE int
-TAO_Incoming_Message_Queue::complete_message (void)
+ACE_INLINE size_t
+TAO_Incoming_Message_Queue::missing_data (void) const
{
- if (this->size_ != 0 &&
- this->queued_data_->next_->missing_data_ == 0)
- return 1;
+ if (this->size_ != 0)
+ return this->queued_data_->missing_data_;
return 0;
}
@@ -46,7 +47,7 @@ TAO_Incoming_Message_Queue::get_node (void)
ACE_INLINE
TAO_Incoming_Message_Queue::TAO_Queued_Data::TAO_Queued_Data (void)
- : data_block_ (0),
+ : msg_block_ (0),
missing_data_ (0),
byte_order_ (0),
next_ (0)
diff --git a/TAO/tao/LIST_OF_TODO b/TAO/tao/LIST_OF_TODO
index d73ac6b0c36..7e33a70257a 100644
--- a/TAO/tao/LIST_OF_TODO
+++ b/TAO/tao/LIST_OF_TODO
@@ -2,4 +2,10 @@
- test for Muli-threaded client & server
- test LongUpCalls
- Russle Moores test
-- Pass data of different sizes from teh client to the server \ No newline at end of file
+- Pass data of different sizes from teh client to the server
+- Problem with two messages coming in two diffrent GIOP versions..
+- Remove allocation of data block for reply handlers. Can use
+ stack based stuff
+- Remove the allocation of data blocks in Reply_Params.
+- AMI tests
+- Big_Twoways.. \ No newline at end of file
diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp
index fb0bde4a745..cf5ca61a9e2 100644
--- a/TAO/tao/ORB.cpp
+++ b/TAO/tao/ORB.cpp
@@ -1836,6 +1836,8 @@ CORBA_ORB::object_to_string (CORBA::Object_ptr obj,
this->orb_core_->output_cdr_dblock_allocator (),
this->orb_core_->output_cdr_msgblock_allocator (),
this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR,
this->orb_core_->to_iso8859 (),
this->orb_core_->to_unicode ());
@@ -2030,7 +2032,11 @@ CORBA_ORB::ior_string_to_object (const char *str,
int byte_order = *(mb.rd_ptr ());
mb.rd_ptr (1);
mb.wr_ptr (len);
- TAO_InputCDR stream (&mb, byte_order, this->orb_core_);
+ TAO_InputCDR stream (&mb,
+ byte_order,
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR,
+ this->orb_core_);
CORBA::Object_ptr objref = CORBA::Object::_nil ();
stream >> objref;
diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp
index a88a05e50be..5b502880ec0 100644
--- a/TAO/tao/ORB_Core.cpp
+++ b/TAO/tao/ORB_Core.cpp
@@ -2754,6 +2754,7 @@ TAO_ORB_Core::create_input_cdr_data_block (size_t size)
lock_strategy);
}
+
ACE_Data_Block *
TAO_ORB_Core::data_block_for_message_block (size_t size)
{
diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h
index 8fce4325e8a..7421d5947fe 100644
--- a/TAO/tao/ORB_Core.h
+++ b/TAO/tao/ORB_Core.h
@@ -452,6 +452,9 @@ public:
ACE_Data_Block *create_input_cdr_data_block (size_t size);
+ /// Return the locking strategy used for the data blocks.
+ ACE_Lock *locking_strategy (void);
+
/// The data blocks returned have memeory from the global pool. Will
/// not get anything from the TSS even if it is available.
ACE_Data_Block *data_block_for_message_block (size_t size);
diff --git a/TAO/tao/ORB_Core.i b/TAO/tao/ORB_Core.i
index c564ccec09b..1b3fa6bccc7 100644
--- a/TAO/tao/ORB_Core.i
+++ b/TAO/tao/ORB_Core.i
@@ -22,6 +22,15 @@ TAO_ORB_Core::_decr_refcnt (void)
return 0;
}
+ACE_INLINE ACE_Lock *
+TAO_ORB_Core::locking_strategy (void)
+{
+ if (this->resource_factory ()->use_locked_data_blocks ())
+ return &this->data_block_lock_;
+
+ return 0;
+}
+
ACE_INLINE TAO_Transport_Cache_Manager *
TAO_ORB_Core::transport_cache (void)
{
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 9164611ae8e..288d946999e 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -126,17 +126,23 @@ public:
/// @@Bala: Documentation please...
virtual int is_message_complete (ACE_Message_Block &message_block) = 0;
+ virtual size_t missing_data (ACE_Message_Block &incoming) = 0;
+
+ virtual CORBA::Octet byte_order (void) = 0;
+
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
TAO_ORB_Core *orb_core,
- ACE_Message_Block &m) = 0;
+ ACE_Message_Block &m,
+ CORBA::Octet byte_order) = 0;
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
virtual int process_reply_message (
TAO_Pluggable_Reply_Params &reply_info,
- ACE_Message_Block &m) = 0;
+ ACE_Message_Block &m,
+ CORBA::Octet byte_order) = 0;
/// Generate a reply message with the exception <ex>.
virtual int generate_exception_reply (
diff --git a/TAO/tao/Synch_Reply_Dispatcher.cpp b/TAO/tao/Synch_Reply_Dispatcher.cpp
index 70c17a67629..d6844859755 100644
--- a/TAO/tao/Synch_Reply_Dispatcher.cpp
+++ b/TAO/tao/Synch_Reply_Dispatcher.cpp
@@ -61,20 +61,9 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (
// dispatcher is used because the request must be re-sent.
//this->message_state_.reset (0);
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.exchange_data_blocks (params.input_cdr_);
-
- /*if (&this->message_state_ != message_state)
- {
- // The Transport Mux Strategy did not use our Message_State to
- // receive the event, possibly because it is muxing multiple
- // requests over the same connection.
-
- // Steal the buffer so that no copying is done.
- this->message_state_.cdr.steal_from (message_state->cdr);
-
- // There is no need to copy the other fields!
- }*/
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
if (this->wait_strategy_ != 0)
{
@@ -91,12 +80,6 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (
return 1;
}
-/*TAO_GIOP_Message_State *
-TAO_Synch_Reply_Dispatcher::message_state (void)
-{
- return &this->message_state_;
-}*/
-
void
TAO_Synch_Reply_Dispatcher::dispatcher_bound (TAO_Transport *transport)
{
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 8e95e7f3783..e0907921053 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -67,6 +67,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, bidirectional_flag_ (-1)
, head_ (0)
, tail_ (0)
+ , incoming_message_queue_ (orb_core)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
@@ -733,7 +734,37 @@ TAO_Transport::recv (char *buffer,
return -1;
// now call the template method
- return this->recv_i (buffer, len, timeout);
+ ssize_t n =
+ this->recv_i (buffer, len, timeout);
+
+ // Most of the errors handling is common for
+ // Now the message has been read
+ if (n == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure \n")
+ ACE_TEXT ("TAO - handle_input () \n")));
+ }
+
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ // Close the connection
+ this->tms_->connection_closed ();
+
+ return -1;
+ }
+ // @@ What are the other error handling here??
+ else if (n == 0)
+ {
+ return -1;
+ }
+
+ return n;
}
@@ -795,9 +826,20 @@ TAO_Transport::handle_input_i (ACE_HANDLE h,
sizeof buf);
#endif /* ACE_HAS_PURIFY */
+ // Create a data block
+ ACE_Data_Block db (sizeof (buf),
+ ACE_Message_Block::MB_DATA,
+ buf,
+ this->orb_core_->message_block_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_dblock_allocator ());
+
// Create a message block
- ACE_Message_Block message_block (buf,
- sizeof (buf));
+ ACE_Message_Block message_block (&db,
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_msgblock_allocator ());
+
// Align the message block
ACE_CDR::mb_align (&message_block);
@@ -809,37 +851,53 @@ TAO_Transport::handle_input_i (ACE_HANDLE h,
message_block.space (),
max_wait_time);
- // Now the message has been read
- if (n == -1 && TAO_debug_level > 0)
+
+ if (n <= 0)
+ return n;
+
+
+ // Set the write pointer in the stack buffer
+ message_block.wr_ptr (n);
+
+ if (this->parse_incoming_messages (message_block) == -1)
+ return -1;
+
+ // Check whether we have a complete message for processing
+ size_t missing_data =
+ this->missing_data (message_block);
+
+ if (missing_data)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p \n"),
- ACE_TEXT ("TAO - read message failure \n")
- ACE_TEXT ("TAO - handle_input () \n")));
+ return this->consolidate_message (message_block,
+ missing_data,
+ h,
+ max_wait_time);
}
- // Error handling
- if (n == -1)
- {
- if (errno == EWOULDBLOCK)
- return 0;
- // Close the connection
- this->tms_->connection_closed ();
+ // @@Bala:
+ return this->process_parsed_messages (
+ message_block,
+ this->messaging_object ()->byte_order (),
+ h);
+}
- return -1;
- }
- // @@ What are the other error handling here??
- else if (n == 0)
+
+
+int
+TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block)
+{
+ // If we have a queue and if the last message is not complete a
+ // complete one, then this read will get us the remaining data. So
+ // do not try to parse the header if we have an incomplete message
+ // in the queue.
+ if (!this->incoming_message_queue_.is_complete_message ())
{
- return -1;
+ return 0;
}
- // Set the write pointer in the stack buffer
- message_block.wr_ptr (n);
-
- // Now that the message has been read, process the message. Call the
- // transport object to do the processing.
+ // Now that a new message has been read, process the message. Call
+ // the messaging object to do the parsing
if (this->messaging_object ()->parse_incoming_messages (message_block) == -1)
{
if (TAO_debug_level > 0)
@@ -851,28 +909,154 @@ TAO_Transport::handle_input_i (ACE_HANDLE h,
return -1;
}
- // @@Bala:
- return this->process_parsed_messages (message_block, h);
+ return 0;
}
+
+size_t
+TAO_Transport::missing_data (ACE_Message_Block &incoming)
+{
+ // If we have message in the queue then find out how much of data
+ // is required to get a complete message
+ if (this->incoming_message_queue_.queue_length ())
+ {
+ return this->incoming_message_queue_.missing_data ();
+ }
+
+ return this->messaging_object ()->missing_data (incoming);
+}
+
+
+int
+TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time)
+{
+ // The write pointer which will be used for reading data from the
+ // socket.
+ char *wr_ptr = 0;
+ if (!this->incoming_message_queue_.is_complete_message ())
+ {
+ return this->consolidate_message_queue (incoming,
+ missing_data,
+ h,
+ max_wait_time);
+ }
+
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.length ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
+
+ // .. do a read on the socket again.
+ ssize_t n = this->recv (incoming.wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // If we got an EWOULDBLOCK or some other error..
+ if (n <= 0)
+ return n;
+
+ // Move the write pointer
+ incoming.wr_ptr (n);
+
+ // ..Decrement
+ missing_data -= n;
+
+ // Get the byte order information
+ CORBA::Octet byte_order =
+ this->messaging_object ()->byte_order ();
+
+ if (missing_data > 0)
+ {
+ // Duplicate the message block
+ ACE_Message_Block *mb =
+ incoming.duplicate ();
+
+ // Stick the message in queue with the byte order information
+ if (this->incoming_message_queue_.add_message (mb,
+ missing_data,
+ byte_order) == -1)
+ {
+ return -1;
+ }
+ return 0;
+ }
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (incoming,
+ byte_order,
+ h);
+}
+
+int
+TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time)
+{
+ // If the queue did not have a complete message put this piece of
+ // message in the queue
+ this->incoming_message_queue_.copy_message (incoming);
+ missing_data = this->incoming_message_queue_.missing_data ();
+
+ if (missing_data > 0)
+ {
+ // Read the message into the last node of the message queue..
+ ssize_t n = this->recv (this->incoming_message_queue_.wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // Error...
+ if (n <= 0)
+ return n;
+
+ // Move the write pointer
+ incoming.wr_ptr (n);
+
+ // Decrement the missing data
+ this->incoming_message_queue_.queued_data_->missing_data_ -= n;
+ }
+
+
+ if (!this->incoming_message_queue_.is_complete_message ())
+ {
+ return 0;
+ }
+
+ CORBA::Octet byte_order = 0;
+
+ // Get the message on the head of the queue..
+ ACE_Message_Block *msg_block =
+ this->incoming_message_queue_.dequeue_head (byte_order);
+
+ // Process the message...
+ if (this->process_parsed_messages (*msg_block,
+ byte_order,
+ h) == -1)
+ return -1;
+
+ // Delete the message block...
+ delete msg_block;
+
+ return 0;
+}
int
TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block,
+ CORBA::Octet byte_order,
ACE_HANDLE h)
{
- // Check whether we have a complete message for processing
- int retval =
- this->messaging_object ()->is_message_complete (message_block);
-
// If we have a complete message, just resume the handler
// Resume the handler.
// @@Bala: Try to solve this issue of reactor resumptions..
this->orb_core_->reactor ()->resume_handler (h);
- // If we dont have a complete message then just return 1 to the
- // reactor so that we can be called back for further reading
- if (!retval)
- return 1;
-
// Get the <message_type> that we have received
TAO_Pluggable_Message_Type t =
this->messaging_object ()->message_type ();
@@ -898,7 +1082,8 @@ TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block,
if (this->messaging_object ()->process_request_message (
this,
this->orb_core (),
- message_block) == -1)
+ message_block,
+ byte_order) == -1)
{
// Close the TMS
this->tms_->connection_closed ();
@@ -915,7 +1100,8 @@ TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block,
TAO_Pluggable_Reply_Params params (this->orb_core ());
if (this->messaging_object ()->process_reply_message (params,
- message_block) == -1)
+ message_block,
+ byte_order) == -1)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 2a65caec6c2..64720a8c501 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -19,15 +19,18 @@
#include "ace/pre.h"
#include "corbafwd.h"
+
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
#include "Exception.h"
#include "Transport_Descriptor_Interface.h"
#include "Transport_Cache_Manager.h"
#include "Transport_Timer.h"
#include "ace/Strategies.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "Incoming_Message_Queue.h"
class TAO_ORB_Core;
class TAO_Target_Specification;
@@ -564,7 +567,25 @@ protected:
virtual void transition_handler_state_i (void) = 0;
/// @@ Bala: Documentation
+ int parse_incoming_messages (ACE_Message_Block &message_block);
+
+ size_t missing_data (ACE_Message_Block &message_block);
+
+ int check_message_integrity (ACE_Message_Block &message_block);
+
+ int consolidate_message (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time);
+
+ int consolidate_message_queue (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time);
+
+ /// @@ Bala: Documentation
virtual int process_parsed_messages (ACE_Message_Block &message_block,
+ CORBA::Octet byte_order,
ACE_HANDLE h = ACE_INVALID_HANDLE);
public:
@@ -769,6 +790,9 @@ protected:
TAO_Queued_Message *head_;
TAO_Queued_Message *tail_;
+ /// @@Bala: Docu??
+ TAO_Incoming_Message_Queue incoming_message_queue_;
+
/// The queue will start draining no later than <queing_deadline_>
/// *if* the deadline is
ACE_Time_Value current_deadline_;