diff options
author | bala <balanatarajan@users.noreply.github.com> | 2001-06-28 15:09:27 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2001-06-28 15:09:27 +0000 |
commit | c4a1528ea7c36701e25b594ae63d13d12bfcc599 (patch) | |
tree | 8b0f572451c9bc5e71b772ed84da1d69a5d1726d | |
parent | dd8e7d30aff4087b2243f36e282e5923ec35ae41 (diff) | |
download | ATCD-c4a1528ea7c36701e25b594ae63d13d12bfcc599.tar.gz |
ChangeLogTag:Thu Jun 28 09:30:43 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | TAO/tao/Connection_Handler.cpp | 7 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 191 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 23 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.i | 9 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.h | 12 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connection_Handler.cpp | 36 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 97 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 62 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.inl | 59 | ||||
-rw-r--r-- | TAO/tao/LIST_OF_TODO | 7 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 16 | ||||
-rw-r--r-- | TAO/tao/TAO.dsp | 50 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 204 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 21 | ||||
-rw-r--r-- | TAO/tao/Wait_On_Read.cpp | 4 |
15 files changed, 513 insertions, 285 deletions
diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp index 099e79d5eef..049b1d15a49 100644 --- a/TAO/tao/Connection_Handler.cpp +++ b/TAO/tao/Connection_Handler.cpp @@ -6,6 +6,7 @@ #include "tao/debug.h" #include "tao/Object.h" #include "tao/Messaging_Policy_i.h" +#include "Resume_Handle.h" #if !defined (__ACE_INLINE__) #include "tao/Connection_Handler.inl" @@ -90,11 +91,15 @@ TAO_Connection_Handler::svc_i (void) max_wait_time = ¤t_timeout; } + TAO_Resume_Handle rh (this->orb_core_, + ACE_INVALID_HANDLE); + while (!this->orb_core_->has_shutdown () && result >= 0) { result = - this->transport ()->handle_input_i (ACE_INVALID_HANDLE, max_wait_time); + this->transport ()->handle_input_i (rh, + max_wait_time); if (result == -1 && errno == ETIME) { diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 8fb375beb2b..e008fc760e0 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -20,7 +20,8 @@ ACE_RCSID (tao, GIOP_Message_Base, "$Id$") TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) - : message_state_ (orb_core, + : orb_core_ (orb_core), + message_state_ (orb_core, this), output_ (0), generator_parser_ (0) @@ -323,9 +324,7 @@ TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) return -1; } - // Set the state internally for parsing and generating messages - this->set_state (this->message_state_.giop_version_.major, - this->message_state_.giop_version_.minor); + return 0; } @@ -336,12 +335,12 @@ TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) CORBA::ULong msg_size = this->message_state_.message_size (); - ssize_t len = incoming.length (); + size_t len = incoming.length (); if (len > msg_size) { // Move the rd_ptr so that we can extract the next message. - incoming.rd_ptr (msg_size); + // incoming.rd_ptr (msg_size); return -1; } else if (len == msg_size) @@ -353,55 +352,156 @@ TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) int TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *qd) + TAO_Queued_Data *&qd) { - TAO_GIOP_Message_State msg_state; + TAO_GIOP_Message_State state (this->orb_core_, + this); if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) { if (incoming.length () > 0) { + // Make a node which has a message block of the size of + // MESSAGE_HEADER_LEN. qd = - this->make_queued_data (incoming.length ()); + this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); - qd.missing_data_ = -1; + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + qd->missing_data_ = -1; } return 0; } - - if (msg_state.parse_message_header (incoming) == -1) + if (state.parse_message_header (incoming) == -1) { return -1; } - size_t copying_len = msg_state.message_size (); + size_t copying_len = state.message_size (); qd = this->make_queued_data (copying_len); if (copying_len > incoming.length ()) { - qd.missing_data_ = + qd->missing_data_ = copying_len - incoming.length (); copying_len -= incoming.length (); } - new_mb.copy (incoming.rd_ptr (), - copying_len); + qd->msg_block_->copy (incoming.rd_ptr (), + copying_len); incoming.rd_ptr (copying_len); - qd.byte_order_ = msg_state.byte_order_; - qd.major_version_ = msg_state.giop_version_.major_version; - qd.minor_version_ = msg_state.giop_version_.minor_version; + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; return 1; } -CORBA::Octet -TAO_GIOP_Message_Base::byte_order (void) +int +TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) { - return this->message_state_.byte_order_; + // Look to see whether we had atleast parsed the GIOP header ... + if (qd->missing_data_ == -1) + { + // The data length that has been stuck in there during the last + // read .... + size_t len = + qd->msg_block_->length (); + + // We know that we would have space for + // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data + // from the <incoming> into the message block in <qd> + qd->msg_block_->copy (incoming.rd_ptr (), + len - TAO_GIOP_MESSAGE_HEADER_LEN); + + // Move the rd_ptr () in the incoming message block.. + incoming.rd_ptr (len - TAO_GIOP_MESSAGE_HEADER_LEN); + + TAO_GIOP_Message_State state (this->orb_core_, + this); + + // Parse the message header now... + if (state.parse_message_header (*qd->msg_block_) == -1) + return -1; + + // Now grow the message block so that we can copy the rest of + // the data... + ACE_CDR::grow (qd->msg_block_, + state.message_size ()); + + // Copy the pay load.. + + // Calculate the bytes that needs to be copied in the queue... + size_t copy_len = + state.message_size () - TAO_GIOP_MESSAGE_HEADER_LEN; + + // If teh data that needs to be copied is more than that is + // available to us .. + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + + // ..now we are set to copy the right amount of data to the + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + qd->msg_block_->rd_ptr (copy_len); + + // Get the other details... + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + } + else + { + // @@todo: Need to abstract this out to a seperate method... + size_t copy_len = qd->missing_data_; + + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + + // Copy the right amount of data in to the node... + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + qd->msg_block_->rd_ptr (copy_len); + } + + return 0; +} + + +void +TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) +{ + qd->byte_order_ = + this->message_state_.byte_order_; + qd->major_version_ = + this->message_state_.giop_version_.major; + qd->minor_version_ = + this->message_state_.giop_version_.minor; } int @@ -414,24 +514,29 @@ TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block & /*incoming*/) int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_ORB_Core *orb_core, - ACE_Message_Block &incoming, - CORBA::Octet byte_order) + TAO_Queued_Data *qd) + { // Set the upcall thread orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ()); + // Set the state internally for parsing and generating messages + this->set_state (qd->major_version_, + qd->minor_version_); + // Reset the output CDR stream. // @@@@Is it necessary here? this->output_->reset (); // Get the read and write positions before we steal data. - size_t rd_pos = incoming.rd_ptr () - incoming.base (); - size_t wr_pos = incoming.wr_ptr () - incoming.base (); + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()), - incoming.length ()); + ACE_reinterpret_cast (u_char *, + qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); // Create a input CDR stream. @@ -439,13 +544,13 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is also done in the higher layers. - TAO_InputCDR input_cdr (incoming.data_block (), + TAO_InputCDR input_cdr (qd->msg_block_->data_block (), ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, - byte_order, - this->message_state_.giop_version_.major, - this->message_state_.giop_version_.minor, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, orb_core); // Set giop version info for the outstream so that server replies @@ -485,32 +590,32 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, int TAO_GIOP_Message_Base::process_reply_message ( TAO_Pluggable_Reply_Params ¶ms, - ACE_Message_Block &incoming, - CORBA::Octet byte_order - ) + TAO_Queued_Data *qd) { // Get the read and write positions before we steal data. - size_t rd_pos = incoming.rd_ptr () - incoming.base (); - size_t wr_pos = incoming.wr_ptr () - incoming.base (); + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; this->dump_msg ("recv", - ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()), - incoming.length ()); + ACE_reinterpret_cast (u_char *, + qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); // Create a empty buffer on stack // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is alos done in the higher layers. - TAO_InputCDR input_cdr (incoming.data_block (), + TAO_InputCDR input_cdr (qd->msg_block_->data_block (), ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, - this->message_state_.giop_version_.major, - this->message_state_.giop_version_.minor, - byte_order); + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); // Reset the message state. Now, we are ready for the next nested // upcall if any. diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index e3f03e6ebec..1fb13974f06 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -27,6 +27,7 @@ class TAO_Pluggable_Reply_Params; +class TAO_Queued_Data; /** * @class TAO_GIOP_Message_Base @@ -111,23 +112,26 @@ public: virtual ssize_t missing_data (ACE_Message_Block &message_block); virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *qd); + TAO_Queued_Data *&qd); - virtual CORBA::Octet byte_order (void); + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming); + + virtual void get_message_data (TAO_Queued_Data *qd); /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, TAO_ORB_Core *orb_core, - ACE_Message_Block &block, - CORBA::Octet byte_order); + TAO_Queued_Data *qd); + /// Parse the reply message that we received and return the reply /// information though <reply_info> virtual int process_reply_message ( TAO_Pluggable_Reply_Params &reply_info, - ACE_Message_Block &block, - CORBA::Octet byte_order); + TAO_Queued_Data *qd); + /// Generate a reply message with the exception <ex>. @@ -204,10 +208,15 @@ private: /// Are there any more messages that needs processing virtual int more_messages (void); - /// @@Bala:Docu?? + /// Creates a new node for the queue with a message block in the + /// node of size <sz>.. TAO_Queued_Data *make_queued_data (size_t sz); + private: + /// Cached ORB_Core pointer... + TAO_ORB_Core *orb_core_; + /// Thr message handler object that does reading and parsing of the /// incoming messages TAO_GIOP_Message_State message_state_; diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i index 33bebef4aac..c68638d5bb3 100644 --- a/TAO/tao/GIOP_Message_Base.i +++ b/TAO/tao/GIOP_Message_Base.i @@ -5,9 +5,10 @@ // GIOP_Message_Base // TAO_Queued_Data * -GIOP_Message_Base::make_queued_data (size_t sz) +TAO_GIOP_Message_Base::make_queued_data (size_t sz) { - qd = TAO_Incoming_Message_Queue::get_queued_data (); + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data (); ACE_Data_Block *db = this->orb_core_->data_block_for_message_block (sz); @@ -21,8 +22,10 @@ GIOP_Message_Base::make_queued_data (size_t sz) ACE_Message_Block *new_mb = mb.duplicate (); - qd.msg_block_ = new_mb; ACE_CDR::mb_align (new_mb); + qd->msg_block_ = new_mb; + + return qd; } diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index f6d7cd2d4a9..8a9a878f597 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -26,6 +26,7 @@ class TAO_ORB_Core; +class TAO_GIOP_Message_Base; /** @@ -45,10 +46,11 @@ class TAO_ORB_Core; class TAO_Export TAO_GIOP_Message_State { - public: - friend class TAO_GIOP_Message_Base; + /// Ctor + TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, + TAO_GIOP_Message_Base *base); enum TAO_GIOP_Message_Status { @@ -77,9 +79,9 @@ public: private: - /// Ctor - TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, - TAO_GIOP_Message_Base *base); + friend class TAO_GIOP_Message_Base; + + /// @@Bala: Documentation please... int parse_message_header_i (ACE_Message_Block &incoming); diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 28891e5e092..de72ecbb1e0 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -1,17 +1,18 @@ // $Id$ -#include "tao/IIOP_Connection_Handler.h" -#include "tao/Timeprobe.h" -#include "tao/debug.h" -#include "tao/ORB_Core.h" -#include "tao/ORB.h" -#include "tao/CDR.h" -#include "tao/Messaging_Policy_i.h" -#include "tao/Server_Strategy_Factory.h" -#include "tao/IIOP_Transport.h" -#include "tao/IIOP_Endpoint.h" -#include "tao/Transport_Cache_Manager.h" -#include "tao/Base_Transport_Property.h" +#include "IIOP_Connection_Handler.h" +#include "Timeprobe.h" +#include "debug.h" +#include "ORB_Core.h" +#include "ORB.h" +#include "CDR.h" +#include "Messaging_Policy_i.h" +#include "Server_Strategy_Factory.h" +#include "IIOP_Transport.h" +#include "IIOP_Endpoint.h" +#include "Transport_Cache_Manager.h" +#include "Base_Transport_Property.h" +#include "Resume_Handle.h" #if !defined (__ACE_INLINE__) # include "tao/IIOP_Connection_Handler.i" @@ -250,8 +251,12 @@ TAO_IIOP_Connection_Handler::resume_handler (void) } int -TAO_IIOP_Connection_Handler::handle_output (ACE_HANDLE) +TAO_IIOP_Connection_Handler::handle_output (ACE_HANDLE h) { + // Instantiate the resume handle here.. This will automatically + // resume the handle once data is written.. + TAO_Resume_Handle resume_handle (this->orb_core (), + h); return this->transport ()->handle_output (); } @@ -322,7 +327,10 @@ TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE h) // Increase the reference count on the upcall that have passed us. this->pending_upcalls_++; - int retval = this->transport ()->handle_input_i (h); + TAO_Resume_Handle resume_handle (this->orb_core (), + h); + + int retval = this->transport ()->handle_input_i (resume_handle); // The upcall is done. Bump down the reference count if (--this->pending_upcalls_ <= 0) diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 910af84ab26..3582b151019 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -1,6 +1,7 @@ #include "Incoming_Message_Queue.h" #include "ORB_Core.h" #include "debug.h" +#include "Pluggable_Messaging_Utils.h" #if !defined (__ACE_INLINE__) # include "Incoming_Message_Queue.inl" @@ -21,41 +22,6 @@ TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void) // Need to delete all the unused data-blocks } -int -TAO_Incoming_Message_Queue::add_message (ACE_Message_Block *incoming, - ssize_t missing_data, - CORBA::Octet byte_order) - -{ - // Allocate memory for TAO_Queued_Data - TAO_Queued_Data *qd = this->get_node (); - - if (qd == 0) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Could not make a node \n"))); - } - return -1; - } - - // Set the data block - qd->msg_block_ = incoming; - - // Set the byte_order - qd->byte_order_ = byte_order; - - qd->missing_data_ = missing_data; - - this->add_node (qd); - - // increment the size of the list - ++this->size_; - - return 1; -} - void TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block) { @@ -63,7 +29,7 @@ TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block) { size_t n = 0; - if (block.length () <= this->queued_data_->missing_data_) + if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_) { n = block.length (); } @@ -78,32 +44,54 @@ TAO_Incoming_Message_Queue::copy_message (ACE_Message_Block &block) } } -ACE_Message_Block * -TAO_Incoming_Message_Queue::dequeue_head (CORBA::Octet &byte_order) +TAO_Queued_Data * +TAO_Incoming_Message_Queue::dequeue_head (void) { + // Get the node on the head of the queue... TAO_Queued_Data *tmp = this->queued_data_->next_; - if (tmp->missing_data_ != 0) + // Reset the head node.. + this->queued_data_->next_ = tmp->next_; + + // Decrease the size + --this->size_; + + return tmp; +} + +TAO_Queued_Data * +TAO_Incoming_Message_Queue::dequeue_tail (void) +{ + // This is a bit painful stuff... + if (this->size_ == 0) return 0; - ACE_Message_Block *db = - tmp->msg_block_; + // Get the node on the head of the queue... + TAO_Queued_Data *tmp = + this->queued_data_->next_; - this->queued_data_->next_ = tmp->next_; - byte_order = tmp->byte_order_; + while (tmp->next_ != this->queued_data_) + { + tmp = tmp->next_; + } + + // Put the head in tmp. + tmp->next_ = this->queued_data_->next_; + + TAO_Queued_Data *ret_qd = this->queued_data_; - delete tmp; + this->queued_data_ = tmp; // Decrease the size --this->size_; - return db; + return ret_qd; } + int -TAO_Incoming_Message_Queue::add_node ( - TAO_Incoming_Message_Queue::TAO_Queued_Data *nd) +TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) { if (this->size_ == 0) { @@ -117,5 +105,20 @@ TAO_Incoming_Message_Queue::add_node ( this->queued_data_ = nd; } + ++ this->size_; return 0; } + + +/************************************************************************/ + +TAO_Queued_Data::TAO_Queued_Data (void) + : msg_block_ (0), + missing_data_ (0), + byte_order_ (0), + major_version_ (0), + minor_version_ (0), + msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), + next_ (0) +{ +} diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index 9c5515a02d9..d4f164e72d9 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -25,6 +25,7 @@ class ACE_Data_Block; class TAO_ORB_Core; class TAO_Queued_Data; class TAO_Transport; +enum TAO_Pluggable_Message_Type; /** * @class TAO_Incoming_Message_Queue @@ -42,55 +43,36 @@ public: /// @@Bala:Docu - int add_message (ACE_Message_Block *block, + /* int add_message (ACE_Message_Block *block, ssize_t missing_data, - CORBA::Octet byte_order); + CORBA::Octet byte_order);*/ void copy_message (ACE_Message_Block &block); CORBA::ULong queue_length (void); - int is_complete_message (void); + int is_tail_complete (void); + + int is_head_complete (void); size_t missing_data (void) const; void missing_data (size_t data); - char *wr_ptr (void) const; - ACE_Message_Block *dequeue_head (CORBA::Octet &byte_order); - - class TAO_Export TAO_Queued_Data - { - public: - TAO_Queued_Data (void); - - /// The actual message queue - ACE_Message_Block *msg_block_; + TAO_Queued_Data *dequeue_head (void); + TAO_Queued_Data *dequeue_tail (void); - CORBA::Long missing_data_; - CORBA::Octet byte_order_; - CORBA::Octet major_version_; + int enqueue_tail (TAO_Queued_Data *nd); - CORBA::Octet minor_version_; - - TAO_Queued_Data *next_; - }; - - static TAO_Queued_Data* get_queued_data (void); private: friend class TAO_Transport; /// @@Bala:Docu - - TAO_Queued_Data *get_node (void); - - int add_node (TAO_Queued_Data *nd); - private: /// TAO_Queued_Data *queued_data_; @@ -101,9 +83,35 @@ private: TAO_ORB_Core *orb_core_; }; +class TAO_Export TAO_Queued_Data +{ +public: + TAO_Queued_Data (void); + + ~TAO_Queued_Data (void); + + static TAO_Queued_Data* get_queued_data (void); + + /// The actual message queue + ACE_Message_Block *msg_block_; + + CORBA::Long missing_data_; + + CORBA::Octet byte_order_; + + CORBA::Octet major_version_; + + CORBA::Octet minor_version_; + + TAO_Pluggable_Message_Type msg_type_; + + TAO_Queued_Data *next_; +}; + #if defined (__ACE_INLINE__) # include "Incoming_Message_Queue.inl" #endif /* __ACE_INLINE__ */ +#include "ace/post.h" #endif /*TAO_INCOMING_MESSAGE_QUEUE_H*/ diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index 8057696682d..302af48511b 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -7,19 +7,30 @@ TAO_Incoming_Message_Queue::queue_length (void) } ACE_INLINE int -TAO_Incoming_Message_Queue::is_complete_message (void) +TAO_Incoming_Message_Queue::is_tail_complete (void) { - if (this->size_ != 0 && + // If the size is 0 return -1 + if (this->size_ == 0) + return -1; + + if (this->size_ && this->queued_data_->missing_data_ == 0) - return 0; + return 1; - return 1; + return 0; } -ACE_INLINE char * -TAO_Incoming_Message_Queue::wr_ptr (void) const +ACE_INLINE int +TAO_Incoming_Message_Queue::is_head_complete (void) { - return this->queued_data_->msg_block_->wr_ptr (); + if (this->size_ == 0) + return -1; + + if (this->size_ && + this->queued_data_->next_->missing_data_ == 0) + return 1; + + return 0; } ACE_INLINE size_t @@ -32,8 +43,23 @@ TAO_Incoming_Message_Queue::missing_data (void) const } -ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data * -TAO_Incoming_Message_Queue::get_queued_data (void) + +ACE_INLINE TAO_Queued_Data * +TAO_Incoming_Message_Queue::get_node (void) +{ + return TAO_Queued_Data::get_queued_data (); +} + + + +ACE_INLINE +TAO_Queued_Data::~TAO_Queued_Data (void) +{ +} + + +ACE_INLINE TAO_Queued_Data * +TAO_Queued_Data::get_queued_data (void) { // @@TODO: Use the global pool for allocationg... TAO_Queued_Data *qd = 0; @@ -43,18 +69,3 @@ TAO_Incoming_Message_Queue::get_queued_data (void) return qd; } - -ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data * -TAO_Incoming_Message_Queue::get_node (void) -{ - return TAO_Incoming_Message_Queue::get_queued_data (); -} - -ACE_INLINE -TAO_Incoming_Message_Queue::TAO_Queued_Data::TAO_Queued_Data (void) - : msg_block_ (0), - missing_data_ (0), - byte_order_ (0), - next_ (0) -{ -} diff --git a/TAO/tao/LIST_OF_TODO b/TAO/tao/LIST_OF_TODO index d44e3e7166a..e0499ce97e8 100644 --- a/TAO/tao/LIST_OF_TODO +++ b/TAO/tao/LIST_OF_TODO @@ -9,3 +9,10 @@ - AMI tests - run purify quantify - DSI_Gateway tests +- Remove the ORB_Core from the signature of a number of methods of + GIOP_Message_Base class +- Go through the code again & again... +- Put tms_->close_connection () wherever we get -1 as a retval.. +- Looks for a memory leak when we delete a node.. +- Change the Output CDR to be on stack.. +- Dont we want a cached transport on the server side...
\ No newline at end of file diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 15801c595f7..d5417e7f83a 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -31,6 +31,7 @@ class TAO_Transport; class TAO_Operation_Details; class TAO_Target_Specification; class TAO_OutputCDR; +class TAO_Queued_Data; // @@ The more I think I about this class, I feel that this class need // not be a ABC as it is now. Instead we have these options @@ -128,24 +129,27 @@ public: virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; - virtual CORBA::Octet byte_order (void) = 0; + virtual void get_message_data (TAO_Queued_Data *qd) = 0; virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *qd) = 0; + TAO_Queued_Data *&qd) = 0; + + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) = 0; /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, TAO_ORB_Core *orb_core, - ACE_Message_Block &m, - CORBA::Octet byte_order) = 0; + TAO_Queued_Data *qd) = 0; + /// Parse the reply message that we received and return the reply /// information though <reply_info> virtual int process_reply_message ( TAO_Pluggable_Reply_Params &reply_info, - ACE_Message_Block &m, - CORBA::Octet byte_order) = 0; + TAO_Queued_Data *qd) = 0; + /// Generate a reply message with the exception <ex>. virtual int generate_exception_reply ( diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index 15ae0b9c8d4..62572777b23 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -411,18 +411,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.cpp # End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_State.cpp
# End Source File
# Begin Source File
@@ -487,6 +479,10 @@ SOURCE=.\IIOPC.cpp # End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.cpp
# End Source File
# Begin Source File
@@ -711,6 +707,10 @@ SOURCE=.\Resource_Factory.cpp # End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Sequence.cpp
# End Source File
# Begin Source File
@@ -1143,18 +1143,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.h # End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.h
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.h
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.h
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_State.h
# End Source File
# Begin Source File
@@ -1219,6 +1211,10 @@ SOURCE=.\IIOPC.h # End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.h
# End Source File
# Begin Source File
@@ -1491,6 +1487,10 @@ SOURCE=.\Resource_Factory.h # End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.h
+# End Source File
+# Begin Source File
+
SOURCE=.\sequence.h
# End Source File
# Begin Source File
@@ -1859,19 +1859,11 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.inl # End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.i
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.i
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.inl
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_State.i
+SOURCE=.\GIOP_Message_State.inl
# End Source File
# Begin Source File
@@ -1923,6 +1915,10 @@ SOURCE=.\IIOPC.i # End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.inl
# End Source File
# Begin Source File
@@ -2103,6 +2099,10 @@ SOURCE=.\Reply_Dispatcher.i # End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\sequence.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index be4f08f456a..7654dba29c0 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -18,6 +18,7 @@ #include "Flushing_Strategy.h" #include "Transport_Cache_Manager.h" #include "debug.h" +#include "Resume_Handle.h" #include "ace/Message_Block.h" @@ -786,7 +787,7 @@ TAO_Transport::generate_request_header ( } int -TAO_Transport::handle_input_i (ACE_HANDLE h, +TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, ACE_Time_Value * max_wait_time, int /*block*/) { @@ -842,22 +843,27 @@ TAO_Transport::handle_input_i (ACE_HANDLE h, if (missing_data < 0) { - this->consolidate_extra_messages (message_block); + return this->consolidate_extra_messages (message_block, + rh); } else if (missing_data > 0) { - return this->consolidate_process_message (message_block, - missing_data, - h, - max_wait_time); + return this->consolidate_message (message_block, + missing_data, + rh, + max_wait_time); } + TAO_Queued_Data qd; + qd.msg_block_ = &message_block; + qd.missing_data_ = missing_data; + + this->messaging_object ()->get_message_data (&qd); + // @@Bala: - return this->process_parsed_messages ( - message_block, - this->messaging_object ()->byte_order (), - h); + return this->process_parsed_messages (&qd, + rh); } @@ -865,14 +871,11 @@ TAO_Transport::handle_input_i (ACE_HANDLE h, int TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block) { - // @@Bala:What about requests whose headers have been completely - // read in the last read???? - // If we have a queue and if the last message is not complete a // complete one, then this read will get us the remaining data. So // do not try to parse the header if we have an incomplete message // in the queue. - if (!this->incoming_message_queue_.is_complete_message ()) + if (this->incoming_message_queue_.is_tail_complete () == 0) { return 0; } @@ -899,7 +902,7 @@ TAO_Transport::missing_data (ACE_Message_Block &incoming) { // If we have a incomplete message in the queue then find out how // much of data is required to get a complete message - if (!this->incoming_message_queue_.is_complete_message ()) + if (this->incoming_message_queue_.is_tail_complete () == 0) { return this->incoming_message_queue_.missing_data (); } @@ -909,18 +912,16 @@ TAO_Transport::missing_data (ACE_Message_Block &incoming) int -TAO_Transport::consolidate_process_message (ACE_Message_Block &incoming, - ssize_t missing_data, - ACE_HANDLE h, - ACE_Time_Value *max_wait_time) +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - // The write pointer which will be used for reading data from the - // socket. - if (!this->incoming_message_queue_.is_complete_message ()) + if (this->incoming_message_queue_.is_tail_complete () == 0) { return this->consolidate_message_queue (incoming, missing_data, - h, + rh, max_wait_time); } @@ -948,48 +949,68 @@ TAO_Transport::consolidate_process_message (ACE_Message_Block &incoming, // ..Decrement missing_data -= n; - // Get the byte order information - CORBA::Octet byte_order = - this->messaging_object ()->byte_order (); - if (missing_data > 0) { // Duplicate the message block ACE_Message_Block *mb = incoming.duplicate (); - // Stick the message in queue with the byte order information - if (this->incoming_message_queue_.add_message (mb, - missing_data, - byte_order) == -1) - { - return -1; - } + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data (); + + qd->missing_data_ = missing_data; + qd->msg_block_ = mb; + + this->messaging_object ()->get_message_data (qd); + + this->incoming_message_queue_.enqueue_tail (qd); + return 0; } + TAO_Queued_Data pqd; + pqd.msg_block_ = &incoming; + pqd.missing_data_ = missing_data; + + this->messaging_object ()->get_message_data (&pqd); + // Now we have a full message in our buffer. Just go ahead and // process that - return this->process_parsed_messages (incoming, - byte_order, - h); + return this->process_parsed_messages (&pqd, + rh); } int TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, ssize_t missing_data, - ACE_HANDLE h, + TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time) { // If the queue did not have a complete message put this piece of - // message in the queue + // message in the queue. We kow it did not have a complete + // message. That is why we are here. this->incoming_message_queue_.copy_message (incoming); missing_data = this->incoming_message_queue_.missing_data (); + + // @@todo: What will happen if we have a part of the next message in + // the incoming message block? If that is a one-way call we handle + // it differently. We will be in soup if the next message is a + // two-way call. We need to process that too.... Can we call + // process_messages () with rd_ptr () of teh incoming_message (), + // moved? + if (missing_data > 0) { - // Read the message into the last node of the message queue.. - ssize_t n = this->recv (this->incoming_message_queue_.wr_ptr (), + // Get the last message from the Queue + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_tail (); + + ACE_Message_Block *mb = + qd->msg_block_; + + ssize_t n = this->recv (mb->wr_ptr (), missing_data, max_wait_time); @@ -998,40 +1019,60 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, return n; // Move the write pointer - incoming.wr_ptr (n); + mb->wr_ptr (n); // Decrement the missing data - this->incoming_message_queue_.queued_data_->missing_data_ -= n; - } + qd->missing_data_ -= n; + // Now put the TAO_Queued_Data back in the queue + this->incoming_message_queue_.enqueue_tail (qd); + } - if (!this->incoming_message_queue_.is_complete_message ()) + // See if the message in the head of the queue is complete... + if (this->incoming_message_queue_.is_head_complete () == 1) { - return 0; - } + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); - CORBA::Octet byte_order = 0; + // Process the message... + if (this->process_parsed_messages (qd, + rh) == -1) + return -1; - // Get the message on the head of the queue.. - ACE_Message_Block *msg_block = - this->incoming_message_queue_.dequeue_head (byte_order); + // Delete the message block first + // delete qd->msg_block_; - // Process the message... - if (this->process_parsed_messages (*msg_block, - byte_order, - h) == -1) - return -1; + // Delete the Queued_Data.. + delete qd; - // Delete the message block... - delete msg_block; + } return 0; } int -TAO_Transport::consolidate_extra_messages (ACE_Message_Block &incoming) +TAO_Transport::consolidate_extra_messages (ACE_Message_Block + &incoming, + TAO_Resume_Handle &rh) { + // @@Bala: What about messages that dont even have their first few + // bytes in... + + // Take a message from the tail.. + TAO_Queued_Data *tail = + this->incoming_message_queue_.dequeue_tail (); + + if (tail ) + { + if (this->messaging_object ()->consolidate_node (tail, + incoming) == -1) + return -1; + // .. put the tail back in queue.. + this->incoming_message_queue_.enqueue_tail (tail); + } + int retval = 1; while (retval == 1) { @@ -1041,26 +1082,47 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block &incoming) this->messaging_object ()->extract_next_message (incoming, q_data); if (q_data) - this->incoming_message_queue_.add_node (qd); + this->incoming_message_queue_.enqueue_tail (q_data); + } + + + + // See if the message in the head of the queue is complete... + if (this->incoming_message_queue_.is_head_complete () == 1) + { + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + // Process the message... + if (this->process_parsed_messages (qd, + rh) == -1) + return -1; + + // Delete the message_block + // delete qd->msg_block_; + + // Delete the Queued_Data.. + delete qd; } if (retval == -1) - return retval; + { + return retval; + } return 0; } int -TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, - CORBA::Octet byte_order, - ACE_HANDLE h) +TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh) { - // If we have a complete message, just resume the handler - // Resume the handler. - // @@Bala: Try to solve this issue of reactor resumptions.. - this->orb_core_->reactor ()->resume_handler (h); + // As we have the message now just resume the handle.. + rh.resume_handle (); // Get the <message_type> that we have received + // @@Wrong.. We need to look at <qd> for this... TAO_Pluggable_Message_Type t = this->messaging_object ()->message_type (); @@ -1085,8 +1147,7 @@ TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, if (this->messaging_object ()->process_request_message ( this, this->orb_core (), - message_block, - byte_order) == -1) + qd) == -1) { // Close the TMS this->tms_->connection_closed (); @@ -1103,8 +1164,7 @@ TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, TAO_Pluggable_Reply_Params params (this->orb_core ()); if (this->messaging_object ()->process_reply_message (params, - message_block, - byte_order) == -1) + qd) == -1) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 5ec795a5035..f9ee20e120a 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -41,6 +41,7 @@ class TAO_Connection_Handler; class TAO_Pluggable_Messaging; class TAO_Queued_Message; +class TAO_Resume_Handle; class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable { @@ -488,7 +489,7 @@ public: * */ // @@ lockme - virtual int handle_input_i (ACE_HANDLE h = ACE_INVALID_HANDLE, + virtual int handle_input_i (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time = 0, int block = 0); @@ -574,22 +575,22 @@ protected: int check_message_integrity (ACE_Message_Block &message_block); - int consolidate_process_message (ACE_Message_Block &incoming, - ssize_t missing_data, - ACE_HANDLE h, - ACE_Time_Value *max_wait_time); + int consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); int consolidate_message_queue (ACE_Message_Block &incoming, ssize_t missing_data, - ACE_HANDLE h, + TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time); - void consolidate_extra_messages (ACE_Message_Block &incoming); + int consolidate_extra_messages (ACE_Message_Block &incoming, + TAO_Resume_Handle &rh); /// @@ Bala: Documentation - virtual int process_parsed_messages (ACE_Message_Block &message_block, - CORBA::Octet byte_order, - ACE_HANDLE h = ACE_INVALID_HANDLE); + virtual int process_parsed_messages (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh); public: /// Method for the connection handler to signify that it diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp index f582e02d496..29e983a90db 100644 --- a/TAO/tao/Wait_On_Read.cpp +++ b/TAO/tao/Wait_On_Read.cpp @@ -2,6 +2,7 @@ #include "tao/Wait_On_Read.h" #include "Transport.h" +#include "Resume_Handle.h" ACE_RCSID(tao, Wait_On_Read, "$Id$") @@ -26,10 +27,11 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, // Do the same sort of looping that is done in other wait // strategies. int retval = 0; + TAO_Resume_Handle rh; while (1) { retval = - this->transport_->handle_input_i (ACE_INVALID_HANDLE, + this->transport_->handle_input_i (rh, max_wait_time, 1); |