diff options
author | bala <balanatarajan@users.noreply.github.com> | 2001-06-17 23:11:02 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2001-06-17 23:11:02 +0000 |
commit | 6ceb20f11b609d45c6bc5687808178b4bd62e826 (patch) | |
tree | b0d8856c3506c1bdc0e893e1766fc9187f9cc8ff | |
parent | 735b2e55f740e7183aad0e4877e5b68f1bbe5845 (diff) | |
download | ATCD-6ceb20f11b609d45c6bc5687808178b4bd62e826.tar.gz |
ChangeLogTag:Sat Jun 17 17:46:23 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 73 | ||||
-rw-r--r-- | TAO/tao/Asynch_Reply_Dispatcher.cpp | 12 | ||||
-rw-r--r-- | TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp | 5 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 196 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 16 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 119 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 21 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.inl | 35 | ||||
-rw-r--r-- | TAO/tao/LIST_OF_TODO | 8 | ||||
-rw-r--r-- | TAO/tao/ORB.cpp | 8 | ||||
-rw-r--r-- | TAO/tao/ORB_Core.cpp | 1 | ||||
-rw-r--r-- | TAO/tao/ORB_Core.h | 3 | ||||
-rw-r--r-- | TAO/tao/ORB_Core.i | 9 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 10 | ||||
-rw-r--r-- | TAO/tao/Synch_Reply_Dispatcher.cpp | 23 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 264 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 32 |
18 files changed, 509 insertions, 328 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 41695602f91..cff70c321ef 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,76 @@ +Sat Jun 17 17:46:23 2001 Balachandran Natarajan <bala@cs.wustl.edu> + + This set of changes comes with complete revamping of the previous + design. The flaws with the previous design were as follows + (1) We were unnecessarily penalising large data blocks. We were + trying to read a particular size of data till the data was + completely removed from the socket. This was totally + ridiculous because we were doing more reads than required. + + (2) The message block that was constructed on the stack with a + buffer from stack never did what we wanted. It was allocating + a data block on the heap and was thus spoiling whatever + optimization that we had tried putting in. + + (3) The incoming message Queue is now managed by the TAO_Transport + object instead of the GIOP classes. + + * tao/GIOP_Message_Base.cpp: + * tao/GIOP_Message_Base.h: Removed the references to the incoming + message queue. Implemention for two methods missing_data () and + byte_order (). Added an extra argument to the methods + process_request_message () and process_reply_message (). Used + the incoming message block to create a input CDR with the + DONT_DELETE flag so that the data block is not deleted after + request processing. + + * tao/GIOP_Message_State.cpp: Removed the inclusion of + Transport.h. + + * tao/Incoming_Message_Queue.h: + * tao/Incoming_Message_Queue.cpp: + * tao/Incoming_Message_Queue.inl: Added an argument to the + add_message (). Further the implementation of add_message () has + changed a bit. It now adds only a new message to the queue. It + doesn't modify a half filled queue. The TAO_Transport object + does that job. So declared the TAO_Transport as the friend class + of the Incoming_Message_Queue. + + Changed the name of the methods complete_message () as + is_complete_message (). Removed the methods current_message ()& + current_byte_order (). + + Added a new method copy_message () which copies messages into + the half empty nodes. + + Added a method wr_ptr () to access the write pointer of the tail + node that has halfempty message. + + * tao/Transport.cpp: + * tao/Transport.h: The Incoming Message Queue is now managed by + this class. Added the following methods + - missing_data () + - parse_incoming_messages () + - check_message_integrity () + - consolidate_message () + - conslodate_message_queue () + + * tao/Pluggable_Messaging.h: Added two new virtual functions + missing_data () and byte_order (). + + * tao/ORB_Core.h: + * tao/ORB_Core.cpp: + * tao/ORB_Core.i: Added an accessor for the locking_strategy used + for the CDR blocks. + + * tao/Synch_Reply_Dispatcher.cpp: + * tao/Asynch_Reply_Dispatcher.cpp: + * tao/DynamicInterface/DII_Reply_Dispatcher.cpp: Changed the + exchange_data_block () to clone_from () which is a new method in + ACE_InputCDR. + + * tao/LIST_OF_TODO: Updated the list + Sat Jun 16 15:49:23 2001 Balachandran Natarajan <bala@cs.wustl.edu> * tao/Any.cpp: diff --git a/TAO/tao/Asynch_Reply_Dispatcher.cpp b/TAO/tao/Asynch_Reply_Dispatcher.cpp index b4a20a82253..110df977c28 100644 --- a/TAO/tao/Asynch_Reply_Dispatcher.cpp +++ b/TAO/tao/Asynch_Reply_Dispatcher.cpp @@ -48,11 +48,6 @@ TAO_Asynch_Reply_Dispatcher_Base::dispatch_reply ( return 0; } -/*TAO_GIOP_Message_State * -TAO_Asynch_Reply_Dispatcher_Base::message_state (void) -{ - return this->message_state_; -} */ void TAO_Asynch_Reply_Dispatcher_Base::dispatcher_bound (TAO_Transport *) @@ -113,10 +108,9 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply ( this->reply_status_ = params.reply_status_; - // this->message_state_ = message_state; - - // Steal the buffer so that no copying is done. - this->reply_cdr_.exchange_data_blocks (params.input_cdr_); + // Transfer the <params.input_cdr_>'s content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); // Steal the buffer, that way we don't do any unnecesary copies of // this data. diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp index 5471cd4d1f3..b4449b1deaf 100644 --- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp +++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp @@ -44,8 +44,9 @@ TAO_DII_Deferred_Reply_Dispatcher::dispatch_reply ( { this->reply_status_ = params.reply_status_; - // Steal the buffer so that no copying is done. - this->reply_cdr_.steal_from (params.input_cdr_); + // Transfer the <params.input_cdr_>'s content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); // Steal the buffer, that way we don't do any unnecesary copies of // this data. diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index cbe8b53ea27..ab01a84ac98 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -21,7 +21,6 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) : message_state_ (orb_core, this), - message_queue_ (orb_core), output_ (0), generator_parser_ (0) { @@ -38,6 +37,8 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, orb_core->message_block_dblock_allocator (), orb_core->message_block_msgblock_allocator (), orb_core->orb_params ()->cdr_memcpy_tradeoff (), + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR, orb_core->to_iso8859 (), orb_core->to_unicode ())); } @@ -315,13 +316,7 @@ TAO_GIOP_Message_Base::message_type (void) int TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) { - // If we have a queue and if the last message is not complete a - // complete one, then this read will get us the remaining data. So - // do not try to parse the header if we have an incomplete message - // in the queue. - if (this->message_queue_.queue_length () > 0 && - this->message_queue_.complete_message () == 0) - return 0; + if (this->message_state_.parse_message_header (incoming) == -1) { @@ -334,48 +329,36 @@ TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) return 0; } -int -TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block &incoming) +size_t +TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) { - // If we have atleast one message in the message queue we just add - // the message to the queue - if (this->message_queue_.queue_length ()) - { - // Add the new message to the Queue - this->message_queue_.add_message (incoming, - this->message_state_); - - // We could have gotten the rest of the message of a previous - // incomplete read. We query to figure whether we have a - // complete message... - return this->message_queue_.complete_message (); - } - - size_t len = incoming.length (); - - len -= TAO_GIOP_MESSAGE_HEADER_LEN; - + // @@Bala: Look for fragmentation here.. // If we had recd. fragmented messages and if the GIOP minor version // is greater than 1, then include the FRAGMENT HEADER to calculate // the effective length of the message - if (this->message_state_.more_fragments_ && + /*if (this->message_state_.more_fragments_ && this->message_state_.giop_version_.minor > 1) - len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + */ + + size_t len = incoming.length (); + + if (len >= this->message_state_.message_size ()) + return 0; + + return this->message_state_.message_size () - len; +} + +CORBA::Octet +TAO_GIOP_Message_Base::byte_order (void) +{ + return this->message_state_.byte_order_; +} + +int +TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block &incoming) +{ - if (len == this->message_state_.message_size_) - return 1; - else - { - // If we have a bigger or smaller message we add the message to - // the queue. - this->message_queue_.add_message (incoming, - this->message_state_); - - // We could have gotten the rest of the message of a previous - // incomplete read. We query to figure whether we have a - // complete message... - return this->message_queue_.complete_message (); - } // @@Bala: Implement other cases return 0; @@ -384,7 +367,8 @@ TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block &incoming) int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_ORB_Core *orb_core, - ACE_Message_Block &incoming) + ACE_Message_Block &incoming, + CORBA::Octet byte_order) { // Set the upcall thread orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ()); @@ -393,64 +377,26 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // @@@@Is it necessary here? this->output_->reset (); - CORBA::Octet byte_order = 0; - size_t rd_pos = 0; - size_t wr_pos = 0; - ACE_Data_Block *data = 0; - size_t len = 0; - char *ptr = 0; - // At this point we have data in the queue or in the <incoming> - // message block. If we have data in the queue we process that - // first. At this point we are just assuming that (and it is a - // pretty good assumption) that the data in <incoming> has already - // been copied. - if (this->message_queue_.queue_length ()) - { - // As we have a message in the queue let us process that. We do - // that by taking the data block off the queue and sticking it - // in the <incoming> message block.. - data = - this->message_queue_.get_current_message (byte_order); - - ptr = data->base (); - // Set the read and write pointer positions - // @@ What do we if we get Fragmented messages? - rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; - len = wr_pos = data->size (); - - } - else - { - // Get the read and write positions before we steal data. - // @@ Bala:Need to change this - rd_pos = incoming.rd_ptr () - incoming.base (); - wr_pos = incoming.wr_ptr () - incoming.base (); - rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; - byte_order = this->message_state_.byte_order_; - - data = incoming.data_block ()->duplicate (); - - len = incoming.length (); - ptr = incoming.rd_ptr (); - // Duplicate the data block - // @@Do we need to ?? - // ACE_Data_Block *data = - //incoming.data_block ()->duplicate (); - } - + // Get the read and write positions before we steal data. + size_t rd_pos = incoming.rd_ptr () - incoming.base (); + size_t wr_pos = incoming.wr_ptr () - incoming.base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, ptr), - len); + ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()), + incoming.length ()); // Create a input CDR stream. // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is also done in the higher layers. - TAO_InputCDR input_cdr (data, + TAO_InputCDR input_cdr (incoming.data_block (), + ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, byte_order, + this->message_state_.giop_version_.major, + this->message_state_.giop_version_.minor, orb_core); // Reset the message handler to receive upcalls if any @@ -485,66 +431,32 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, int TAO_GIOP_Message_Base::process_reply_message ( TAO_Pluggable_Reply_Params ¶ms, - ACE_Message_Block &incoming + ACE_Message_Block &incoming, + CORBA::Octet byte_order ) { - CORBA::Octet byte_order = 0; - size_t rd_pos = 0; - size_t wr_pos = 0; - ACE_Data_Block *data = 0; - size_t len = 0; - char *ptr = 0; - // At this point we have data in the queue or in the <incoming> - // message block. If we have data in the queue we process that - // first. At this point we are just assuming that (and it is a - // pretty good assumption) that the data in <incoming> has already - // been copied. - if (this->message_queue_.queue_length ()) - { - // As we have a message in the queue let us process that. We do - // that by taking the data block off the queue and sticking it - // in the <incoming> message block.. - data = - this->message_queue_.get_current_message (byte_order); - - ptr = data->base (); - // Set the read and write pointer positions - // @@ What do we if we get Fragmented messages? - rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; - len = wr_pos = data->size (); - } - else - { - // Get the read and write positions before we steal data. - // @@ Bala:Need to change this - rd_pos = incoming.rd_ptr () - incoming.base (); - wr_pos = incoming.wr_ptr () - incoming.base (); - rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; - byte_order = this->message_state_.byte_order_; - - data = incoming.data_block ()->duplicate (); - len = incoming.length (); - ptr = incoming.rd_ptr (); - // Duplicate the data block - // @@Do we need to ?? - // ACE_Data_Block *data = - //incoming.data_block ()->duplicate (); - } + + // Get the read and write positions before we steal data. + size_t rd_pos = incoming.rd_ptr () - incoming.base (); + size_t wr_pos = incoming.wr_ptr () - incoming.base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, ptr), - len); + ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()), + incoming.length ()); - // Create a input CDR stream. + // Create a empty buffer on stack // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is alos done in the higher layers. - TAO_InputCDR input_cdr (data, + TAO_InputCDR input_cdr (incoming.data_block (), + ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, + this->message_state_.giop_version_.major, + this->message_state_.giop_version_.minor, byte_order); - // Reset the message state. Now, we are ready for the next nested // upcall if any. // this->message_handler_.reset (0); @@ -1205,6 +1117,8 @@ TAO_GIOP_Message_Base::send_reply_exception ( orb_core->output_cdr_dblock_allocator (), orb_core->output_cdr_msgblock_allocator (), orb_core->orb_params ()->cdr_memcpy_tradeoff (), + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR, orb_core->to_iso8859 (), orb_core->to_unicode ()); diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index dc2f9cf12e7..1ca368005fe 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -24,7 +24,7 @@ #include "tao/GIOP_Message_Generator_Parser_Impl.h" #include "tao/GIOP_Utils.h" #include "tao/GIOP_Message_State.h" -#include "tao/Incoming_Message_Queue.h" + class TAO_Pluggable_Reply_Params; @@ -107,17 +107,24 @@ public: /// @@Bala:Documentation please.. virtual int is_message_complete (ACE_Message_Block &message_block); + /// @@Bala:Documentation please.. + virtual size_t missing_data (ACE_Message_Block &message_block); + + virtual CORBA::Octet byte_order (void); + /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, TAO_ORB_Core *orb_core, - ACE_Message_Block &block); + ACE_Message_Block &block, + CORBA::Octet byte_order); /// Parse the reply message that we received and return the reply /// information though <reply_info> virtual int process_reply_message ( TAO_Pluggable_Reply_Params &reply_info, - ACE_Message_Block &block); + ACE_Message_Block &block, + CORBA::Octet byte_order); /// Generate a reply message with the exception <ex>. @@ -200,9 +207,6 @@ private: /// incoming messages TAO_GIOP_Message_State message_state_; - /// @@Bala:Docu - TAO_Incoming_Message_Queue message_queue_; - /// Output CDR TAO_OutputCDR *output_; diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index 9a2401216a4..3171f3de61b 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -6,7 +6,7 @@ #include "tao/Pluggable.h" #include "tao/debug.h" #include "tao/GIOP_Message_Base.h" -#include "Transport.h" +//#include "Transport.h" #if !defined (__ACE_INLINE__) # include "tao/GIOP_Message_State.inl" diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 13ecd6310f5..e5b239b775c 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -1,6 +1,6 @@ #include "Incoming_Message_Queue.h" #include "ORB_Core.h" -#include "GIOP_Message_State.h" +#include "debug.h" #if !defined (__ACE_INLINE__) # include "Incoming_Message_Queue.inl" @@ -22,98 +22,64 @@ TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void) } int -TAO_Incoming_Message_Queue::add_message (const ACE_Message_Block &block, - const TAO_GIOP_Message_State &state) +TAO_Incoming_Message_Queue::add_message (ACE_Message_Block *incoming, + size_t missing_data, + CORBA::Octet byte_order) + { + // Allocate memory for TAO_Queued_Data + TAO_Queued_Data *qd = this->get_node (); - // Check whether the last message in the Queue has a half message - if (this->size_ > 0 && - this->queued_data_->missing_data_ > 0) + if (qd == 0) { - // Create the message block - ACE_Message_Block mb (this->queued_data_->data_block_); + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Could not make a node \n"))); + } + return -1; + } + + // Set the data block + qd->msg_block_ = incoming; - // Duplicate the message block so that the data block is not - // deleted by the above message block - this->queued_data_->data_block_->duplicate (); + // Set the byte_order + qd->byte_order_ = byte_order; - // Set the write pointer in the message block so that we can - // copy in the message block. To do that we need to calculate - // the write pointer position. - size_t wr_pos = - this->queued_data_->data_block_->size () - - this->queued_data_->missing_data_; + qd->missing_data_ = missing_data; - mb.wr_ptr (wr_pos); + this->add_node (qd); + + // increment the size of the list + ++this->size_; + + return 1; +} - // If we have received more data than the missing data we copy - // only the missing data. If we have received less then we copy - // all the data. So we calculate how much of data to copy. +void +TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block) +{ + if (this->size_ > 0) + { size_t n = 0; - if (block.length () > - this->queued_data_->missing_data_) + if (block.length () <= this->queued_data_->missing_data_) { - n = this->queued_data_->missing_data_; + n = block.length (); } else { - n = block.length (); + n = this->queued_data_->missing_data_; } - // Now do the copy of the missing data. - mb.copy (block.rd_ptr (), - n); - - // Now that we have copied n bytes, let us decrease the - // <missing_data_> by n. + this->queued_data_->msg_block_->copy (block.rd_ptr (), + n); this->queued_data_->missing_data_ -= n; - } - else - { - // Create a data block of size of the incoming message - // @@Bala: Hard coding?? - ACE_Data_Block *db = - this->orb_core_->data_block_for_message_block (state.message_size ()); - - // Make a message block with the above data_block - ACE_Message_Block mb (db); - - // Increment the ref count so that the data block is not deleted - // when the message block goes out of scope. This is necessary as we - // have created the message block on the stack - db->duplicate (); - - // Do a copy of the message in to the above data block - mb.copy (block.rd_ptr (), - block.length ()); - - // Allocate memory for TAO_Queued_Data - TAO_Queued_Data *qd = this->get_node (); - - // Set the data block - qd->data_block_ = db; - - // Set the byte_order - qd->byte_order_ = state.byte_order (); - - if (state.message_size () <= block.length ()) - qd->missing_data_ = 0; - else - qd->missing_data_ = state.message_size () - block.length (); - - this->add_node (qd); - - // increment the size of the list - ++this->size_; - } - - return 1; } -ACE_Data_Block * -TAO_Incoming_Message_Queue::get_current_message (CORBA::Octet &byte_order) +ACE_Message_Block * +TAO_Incoming_Message_Queue::dequeue_head (CORBA::Octet &byte_order) { TAO_Queued_Data *tmp = this->queued_data_->next_; @@ -121,9 +87,8 @@ TAO_Incoming_Message_Queue::get_current_message (CORBA::Octet &byte_order) if (tmp->missing_data_ != 0) return 0; - ACE_Data_Block *db = - tmp->data_block_; - + ACE_Message_Block *db = + tmp->msg_block_; this->queued_data_->next_ = tmp->next_; byte_order = tmp->byte_order_; diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index ef29b6ac7e1..ffe0f379e1d 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -23,8 +23,8 @@ /// Forward declarations class ACE_Data_Block; class TAO_ORB_Core; -class TAO_GIOP_Message_State; class TAO_Queued_Data; +class TAO_Transport; /** * @class TAO_Incoming_Message_Queue @@ -42,21 +42,26 @@ public: /// @@Bala:Docu - int add_message (const ACE_Message_Block &block, - const TAO_GIOP_Message_State &state); + int add_message (ACE_Message_Block *block, + size_t missing_data, + CORBA::Octet byte_order); - const ACE_Data_Block *current_message (void) const; - const CORBA::Octet current_byte_order (void) const; + void copy_message (ACE_Message_Block &block); CORBA::ULong queue_length (void); - int complete_message (void); + int is_complete_message (void); + size_t missing_data (void) const; + void missing_data (size_t data); - ACE_Data_Block *get_current_message (CORBA::Octet &byte_order); + char *wr_ptr (void) const; + ACE_Message_Block *dequeue_head (CORBA::Octet &byte_order); private: + friend class TAO_Transport; + /// @@Bala:Docu class TAO_Export TAO_Queued_Data { @@ -64,7 +69,7 @@ private: TAO_Queued_Data (void); /// The actual message queue - ACE_Data_Block *data_block_; + ACE_Message_Block *msg_block_; CORBA::ULong missing_data_; diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index 8bb81f3a247..10713dcf732 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -1,31 +1,32 @@ // -*- C++ -*- //$Id$ -ACE_INLINE const ACE_Data_Block * -TAO_Incoming_Message_Queue::current_message (void) const +ACE_INLINE CORBA::ULong +TAO_Incoming_Message_Queue::queue_length (void) { - - return this->queued_data_->data_block_; + return this->size_; } -ACE_INLINE const CORBA::Octet -TAO_Incoming_Message_Queue::current_byte_order (void) const +ACE_INLINE int +TAO_Incoming_Message_Queue::is_complete_message (void) { - return this->queued_data_->byte_order_; -} + if (this->size_ != 0 && + this->queued_data_->missing_data_ == 0) + return 0; + return 1; +} -ACE_INLINE CORBA::ULong -TAO_Incoming_Message_Queue::queue_length (void) +ACE_INLINE char * +TAO_Incoming_Message_Queue::wr_ptr (void) const { - return this->size_; + return this->queued_data_->msg_block_->wr_ptr (); } -ACE_INLINE int -TAO_Incoming_Message_Queue::complete_message (void) +ACE_INLINE size_t +TAO_Incoming_Message_Queue::missing_data (void) const { - if (this->size_ != 0 && - this->queued_data_->next_->missing_data_ == 0) - return 1; + if (this->size_ != 0) + return this->queued_data_->missing_data_; return 0; } @@ -46,7 +47,7 @@ TAO_Incoming_Message_Queue::get_node (void) ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data::TAO_Queued_Data (void) - : data_block_ (0), + : msg_block_ (0), missing_data_ (0), byte_order_ (0), next_ (0) diff --git a/TAO/tao/LIST_OF_TODO b/TAO/tao/LIST_OF_TODO index d73ac6b0c36..7e33a70257a 100644 --- a/TAO/tao/LIST_OF_TODO +++ b/TAO/tao/LIST_OF_TODO @@ -2,4 +2,10 @@ - test for Muli-threaded client & server - test LongUpCalls - Russle Moores test -- Pass data of different sizes from teh client to the server
\ No newline at end of file +- Pass data of different sizes from teh client to the server +- Problem with two messages coming in two diffrent GIOP versions.. +- Remove allocation of data block for reply handlers. Can use + stack based stuff +- Remove the allocation of data blocks in Reply_Params. +- AMI tests +- Big_Twoways..
\ No newline at end of file diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp index fb0bde4a745..cf5ca61a9e2 100644 --- a/TAO/tao/ORB.cpp +++ b/TAO/tao/ORB.cpp @@ -1836,6 +1836,8 @@ CORBA_ORB::object_to_string (CORBA::Object_ptr obj, this->orb_core_->output_cdr_dblock_allocator (), this->orb_core_->output_cdr_msgblock_allocator (), this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR, this->orb_core_->to_iso8859 (), this->orb_core_->to_unicode ()); @@ -2030,7 +2032,11 @@ CORBA_ORB::ior_string_to_object (const char *str, int byte_order = *(mb.rd_ptr ()); mb.rd_ptr (1); mb.wr_ptr (len); - TAO_InputCDR stream (&mb, byte_order, this->orb_core_); + TAO_InputCDR stream (&mb, + byte_order, + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR, + this->orb_core_); CORBA::Object_ptr objref = CORBA::Object::_nil (); stream >> objref; diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index a88a05e50be..5b502880ec0 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -2754,6 +2754,7 @@ TAO_ORB_Core::create_input_cdr_data_block (size_t size) lock_strategy); } + ACE_Data_Block * TAO_ORB_Core::data_block_for_message_block (size_t size) { diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index 8fce4325e8a..7421d5947fe 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -452,6 +452,9 @@ public: ACE_Data_Block *create_input_cdr_data_block (size_t size); + /// Return the locking strategy used for the data blocks. + ACE_Lock *locking_strategy (void); + /// The data blocks returned have memeory from the global pool. Will /// not get anything from the TSS even if it is available. ACE_Data_Block *data_block_for_message_block (size_t size); diff --git a/TAO/tao/ORB_Core.i b/TAO/tao/ORB_Core.i index c564ccec09b..1b3fa6bccc7 100644 --- a/TAO/tao/ORB_Core.i +++ b/TAO/tao/ORB_Core.i @@ -22,6 +22,15 @@ TAO_ORB_Core::_decr_refcnt (void) return 0; } +ACE_INLINE ACE_Lock * +TAO_ORB_Core::locking_strategy (void) +{ + if (this->resource_factory ()->use_locked_data_blocks ()) + return &this->data_block_lock_; + + return 0; +} + ACE_INLINE TAO_Transport_Cache_Manager * TAO_ORB_Core::transport_cache (void) { diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 9164611ae8e..288d946999e 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -126,17 +126,23 @@ public: /// @@Bala: Documentation please... virtual int is_message_complete (ACE_Message_Block &message_block) = 0; + virtual size_t missing_data (ACE_Message_Block &incoming) = 0; + + virtual CORBA::Octet byte_order (void) = 0; + /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, TAO_ORB_Core *orb_core, - ACE_Message_Block &m) = 0; + ACE_Message_Block &m, + CORBA::Octet byte_order) = 0; /// Parse the reply message that we received and return the reply /// information though <reply_info> virtual int process_reply_message ( TAO_Pluggable_Reply_Params &reply_info, - ACE_Message_Block &m) = 0; + ACE_Message_Block &m, + CORBA::Octet byte_order) = 0; /// Generate a reply message with the exception <ex>. virtual int generate_exception_reply ( diff --git a/TAO/tao/Synch_Reply_Dispatcher.cpp b/TAO/tao/Synch_Reply_Dispatcher.cpp index 70c17a67629..d6844859755 100644 --- a/TAO/tao/Synch_Reply_Dispatcher.cpp +++ b/TAO/tao/Synch_Reply_Dispatcher.cpp @@ -61,20 +61,9 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply ( // dispatcher is used because the request must be re-sent. //this->message_state_.reset (0); - // Steal the buffer so that no copying is done. - this->reply_cdr_.exchange_data_blocks (params.input_cdr_); - - /*if (&this->message_state_ != message_state) - { - // The Transport Mux Strategy did not use our Message_State to - // receive the event, possibly because it is muxing multiple - // requests over the same connection. - - // Steal the buffer so that no copying is done. - this->message_state_.cdr.steal_from (message_state->cdr); - - // There is no need to copy the other fields! - }*/ + // Transfer the <params.input_cdr_>'s content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); if (this->wait_strategy_ != 0) { @@ -91,12 +80,6 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply ( return 1; } -/*TAO_GIOP_Message_State * -TAO_Synch_Reply_Dispatcher::message_state (void) -{ - return &this->message_state_; -}*/ - void TAO_Synch_Reply_Dispatcher::dispatcher_bound (TAO_Transport *transport) { diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 8e95e7f3783..e0907921053 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -67,6 +67,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , bidirectional_flag_ (-1) , head_ (0) , tail_ (0) + , incoming_message_queue_ (orb_core) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) @@ -733,7 +734,37 @@ TAO_Transport::recv (char *buffer, return -1; // now call the template method - return this->recv_i (buffer, len, timeout); + ssize_t n = + this->recv_i (buffer, len, timeout); + + // Most of the errors handling is common for + // Now the message has been read + if (n == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure \n") + ACE_TEXT ("TAO - handle_input () \n"))); + } + + // Error handling + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + + // Close the connection + this->tms_->connection_closed (); + + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } + + return n; } @@ -795,9 +826,20 @@ TAO_Transport::handle_input_i (ACE_HANDLE h, sizeof buf); #endif /* ACE_HAS_PURIFY */ + // Create a data block + ACE_Data_Block db (sizeof (buf), + ACE_Message_Block::MB_DATA, + buf, + this->orb_core_->message_block_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_dblock_allocator ()); + // Create a message block - ACE_Message_Block message_block (buf, - sizeof (buf)); + ACE_Message_Block message_block (&db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_msgblock_allocator ()); + // Align the message block ACE_CDR::mb_align (&message_block); @@ -809,37 +851,53 @@ TAO_Transport::handle_input_i (ACE_HANDLE h, message_block.space (), max_wait_time); - // Now the message has been read - if (n == -1 && TAO_debug_level > 0) + + if (n <= 0) + return n; + + + // Set the write pointer in the stack buffer + message_block.wr_ptr (n); + + if (this->parse_incoming_messages (message_block) == -1) + return -1; + + // Check whether we have a complete message for processing + size_t missing_data = + this->missing_data (message_block); + + if (missing_data) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p \n"), - ACE_TEXT ("TAO - read message failure \n") - ACE_TEXT ("TAO - handle_input () \n"))); + return this->consolidate_message (message_block, + missing_data, + h, + max_wait_time); } - // Error handling - if (n == -1) - { - if (errno == EWOULDBLOCK) - return 0; - // Close the connection - this->tms_->connection_closed (); + // @@Bala: + return this->process_parsed_messages ( + message_block, + this->messaging_object ()->byte_order (), + h); +} - return -1; - } - // @@ What are the other error handling here?? - else if (n == 0) + + +int +TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block) +{ + // If we have a queue and if the last message is not complete a + // complete one, then this read will get us the remaining data. So + // do not try to parse the header if we have an incomplete message + // in the queue. + if (!this->incoming_message_queue_.is_complete_message ()) { - return -1; + return 0; } - // Set the write pointer in the stack buffer - message_block.wr_ptr (n); - - // Now that the message has been read, process the message. Call the - // transport object to do the processing. + // Now that a new message has been read, process the message. Call + // the messaging object to do the parsing if (this->messaging_object ()->parse_incoming_messages (message_block) == -1) { if (TAO_debug_level > 0) @@ -851,28 +909,154 @@ TAO_Transport::handle_input_i (ACE_HANDLE h, return -1; } - // @@Bala: - return this->process_parsed_messages (message_block, h); + return 0; } + +size_t +TAO_Transport::missing_data (ACE_Message_Block &incoming) +{ + // If we have message in the queue then find out how much of data + // is required to get a complete message + if (this->incoming_message_queue_.queue_length ()) + { + return this->incoming_message_queue_.missing_data (); + } + + return this->messaging_object ()->missing_data (incoming); +} + + +int +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time) +{ + // The write pointer which will be used for reading data from the + // socket. + char *wr_ptr = 0; + if (!this->incoming_message_queue_.is_complete_message ()) + { + return this->consolidate_message_queue (incoming, + missing_data, + h, + max_wait_time); + } + + // Calculate the actual length of the load that we are supposed to + // read which is equal to the <missing_data> + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.length (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); + + // .. do a read on the socket again. + ssize_t n = this->recv (incoming.wr_ptr (), + missing_data, + max_wait_time); + + // If we got an EWOULDBLOCK or some other error.. + if (n <= 0) + return n; + + // Move the write pointer + incoming.wr_ptr (n); + + // ..Decrement + missing_data -= n; + + // Get the byte order information + CORBA::Octet byte_order = + this->messaging_object ()->byte_order (); + + if (missing_data > 0) + { + // Duplicate the message block + ACE_Message_Block *mb = + incoming.duplicate (); + + // Stick the message in queue with the byte order information + if (this->incoming_message_queue_.add_message (mb, + missing_data, + byte_order) == -1) + { + return -1; + } + return 0; + } + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (incoming, + byte_order, + h); +} + +int +TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time) +{ + // If the queue did not have a complete message put this piece of + // message in the queue + this->incoming_message_queue_.copy_message (incoming); + missing_data = this->incoming_message_queue_.missing_data (); + + if (missing_data > 0) + { + // Read the message into the last node of the message queue.. + ssize_t n = this->recv (this->incoming_message_queue_.wr_ptr (), + missing_data, + max_wait_time); + + // Error... + if (n <= 0) + return n; + + // Move the write pointer + incoming.wr_ptr (n); + + // Decrement the missing data + this->incoming_message_queue_.queued_data_->missing_data_ -= n; + } + + + if (!this->incoming_message_queue_.is_complete_message ()) + { + return 0; + } + + CORBA::Octet byte_order = 0; + + // Get the message on the head of the queue.. + ACE_Message_Block *msg_block = + this->incoming_message_queue_.dequeue_head (byte_order); + + // Process the message... + if (this->process_parsed_messages (*msg_block, + byte_order, + h) == -1) + return -1; + + // Delete the message block... + delete msg_block; + + return 0; +} int TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, + CORBA::Octet byte_order, ACE_HANDLE h) { - // Check whether we have a complete message for processing - int retval = - this->messaging_object ()->is_message_complete (message_block); - // If we have a complete message, just resume the handler // Resume the handler. // @@Bala: Try to solve this issue of reactor resumptions.. this->orb_core_->reactor ()->resume_handler (h); - // If we dont have a complete message then just return 1 to the - // reactor so that we can be called back for further reading - if (!retval) - return 1; - // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = this->messaging_object ()->message_type (); @@ -898,7 +1082,8 @@ TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, if (this->messaging_object ()->process_request_message ( this, this->orb_core (), - message_block) == -1) + message_block, + byte_order) == -1) { // Close the TMS this->tms_->connection_closed (); @@ -915,7 +1100,8 @@ TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, TAO_Pluggable_Reply_Params params (this->orb_core ()); if (this->messaging_object ()->process_reply_message (params, - message_block) == -1) + message_block, + byte_order) == -1) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 2a65caec6c2..64720a8c501 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -19,15 +19,18 @@ #include "ace/pre.h" #include "corbafwd.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + #include "Exception.h" #include "Transport_Descriptor_Interface.h" #include "Transport_Cache_Manager.h" #include "Transport_Timer.h" #include "ace/Strategies.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "Incoming_Message_Queue.h" class TAO_ORB_Core; class TAO_Target_Specification; @@ -564,7 +567,25 @@ protected: virtual void transition_handler_state_i (void) = 0; /// @@ Bala: Documentation + int parse_incoming_messages (ACE_Message_Block &message_block); + + size_t missing_data (ACE_Message_Block &message_block); + + int check_message_integrity (ACE_Message_Block &message_block); + + int consolidate_message (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time); + + int consolidate_message_queue (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time); + + /// @@ Bala: Documentation virtual int process_parsed_messages (ACE_Message_Block &message_block, + CORBA::Octet byte_order, ACE_HANDLE h = ACE_INVALID_HANDLE); public: @@ -769,6 +790,9 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; + /// @@Bala: Docu?? + TAO_Incoming_Message_Queue incoming_message_queue_; + /// The queue will start draining no later than <queing_deadline_> /// *if* the deadline is ACE_Time_Value current_deadline_; |