diff options
author | bala <balanatarajan@users.noreply.github.com> | 2001-06-26 00:26:01 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2001-06-26 00:26:01 +0000 |
commit | dd8e7d30aff4087b2243f36e282e5923ec35ae41 (patch) | |
tree | c36685d175cf6f001b9b85be4a4f49a7fdd3d635 | |
parent | de0f6a7f015db1a546863c25e8237c4062ed8146 (diff) | |
download | ATCD-dd8e7d30aff4087b2243f36e282e5923ec35ae41.tar.gz |
ChangeLogTag: Mon Jun 25 19:21:43 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 76 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.h | 7 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.i | 46 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_State.inl | 9 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 35 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 7 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.inl | 9 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 5 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 90 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 12 |
11 files changed, 192 insertions, 106 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index e3882c63802..8fb375beb2b 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -318,7 +318,6 @@ int TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) { - if (this->message_state_.parse_message_header (incoming) == -1) { return -1; @@ -330,24 +329,73 @@ TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) return 0; } -size_t +ssize_t TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) { - // @@Bala: Look for fragmentation here.. - // If we had recd. fragmented messages and if the GIOP minor version - // is greater than 1, then include the FRAGMENT HEADER to calculate - // the effective length of the message - /*if (this->message_state_.more_fragments_ && - this->message_state_.giop_version_.minor > 1) - len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER; - */ + //Actual message size including the header.. + CORBA::ULong msg_size = + this->message_state_.message_size (); + + ssize_t len = incoming.length (); + + if (len > msg_size) + { + // Move the rd_ptr so that we can extract the next message. + incoming.rd_ptr (msg_size); + return -1; + } + else if (len == msg_size) + return 0; + + return msg_size - len; +} + + +int +TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *qd) +{ + TAO_GIOP_Message_State msg_state; + + if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + if (incoming.length () > 0) + { + qd = + this->make_queued_data (incoming.length ()); + + qd.missing_data_ = -1; + } + return 0; + } + + + if (msg_state.parse_message_header (incoming) == -1) + { + return -1; + } + + size_t copying_len = msg_state.message_size (); + + qd = this->make_queued_data (copying_len); + + if (copying_len > incoming.length ()) + { + qd.missing_data_ = + copying_len - incoming.length (); + + copying_len -= incoming.length (); + } - size_t len = incoming.length (); + new_mb.copy (incoming.rd_ptr (), + copying_len); - if (len >= this->message_state_.message_size ()) - return 0; + incoming.rd_ptr (copying_len); + qd.byte_order_ = msg_state.byte_order_; + qd.major_version_ = msg_state.giop_version_.major_version; + qd.minor_version_ = msg_state.giop_version_.minor_version; - return this->message_state_.message_size () - len; + return 1; } CORBA::Octet diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 1ca368005fe..e3f03e6ebec 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -108,7 +108,10 @@ public: virtual int is_message_complete (ACE_Message_Block &message_block); /// @@Bala:Documentation please.. - virtual size_t missing_data (ACE_Message_Block &message_block); + virtual ssize_t missing_data (ACE_Message_Block &message_block); + + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *qd); virtual CORBA::Octet byte_order (void); @@ -201,6 +204,8 @@ private: /// Are there any more messages that needs processing virtual int more_messages (void); + /// @@Bala:Docu?? + TAO_Queued_Data *make_queued_data (size_t sz); private: /// Thr message handler object that does reading and parsing of the diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i index 45b18282a10..33bebef4aac 100644 --- a/TAO/tao/GIOP_Message_Base.i +++ b/TAO/tao/GIOP_Message_Base.i @@ -4,41 +4,25 @@ // // GIOP_Message_Base // -# if 0 -ACE_INLINE const size_t -TAO_GIOP_Message_Base::header_len (void) +TAO_Queued_Data * +GIOP_Message_Base::make_queued_data (size_t sz) { - return TAO_GIOP_MESSAGE_HEADER_LEN; -} + qd = TAO_Incoming_Message_Queue::get_queued_data (); -ACE_INLINE const size_t -TAO_GIOP_Message_Base::message_size_offset (void) -{ - return TAO_GIOP_MESSAGE_SIZE_OFFSET; -} + ACE_Data_Block *db = + this->orb_core_->data_block_for_message_block (sz); -ACE_INLINE const size_t -TAO_GIOP_Message_Base::major_version_offset (void) -{ - return TAO_GIOP_VERSION_MAJOR_OFFSET; -} + ACE_Allocator *alloc = + this->orb_core_->message_block_msgblock_allocator (); -ACE_INLINE const size_t -TAO_GIOP_Message_Base::minor_version_offset (void) -{ - return TAO_GIOP_VERSION_MINOR_OFFSET; -} + ACE_Message_Block mb (db, + 0, + alloc); -ACE_INLINE const size_t -TAO_GIOP_Message_Base::flags_offset (void) -{ - return TAO_GIOP_MESSAGE_FLAGS_OFFSET; -} + ACE_Message_Block *new_mb = mb.duplicate (); -ACE_INLINE const size_t -TAO_GIOP_Message_Base::message_type_offset (void) -{ - return TAO_GIOP_MESSAGE_TYPE_OFFSET; -} + qd.msg_block_ = new_mb; + ACE_CDR::mb_align (new_mb); -#endif /*if 0*/ + return qd; +} diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl index 9238ba596d4..cf326079aad 100644 --- a/TAO/tao/GIOP_Message_State.inl +++ b/TAO/tao/GIOP_Message_State.inl @@ -5,7 +5,14 @@ ACE_INLINE CORBA::ULong TAO_GIOP_Message_State::message_size (void) const { - return this->message_size_ + TAO_GIOP_MESSAGE_HEADER_LEN; + CORBA::ULong len = + this->message_size_ + TAO_GIOP_MESSAGE_HEADER_LEN; + + if (this->more_fragments_ && + this->giop_version_.minor > 1) + len += TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + + return len; } ACE_INLINE CORBA::ULong diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 997226316cc..22f56d89dad 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -83,9 +83,38 @@ TAO_IIOP_Transport::recv_i (char *buf, size_t len, const ACE_Time_Value *max_wait_time) { - return this->connection_handler_->peer ().recv (buf, - len, - max_wait_time); + ssize_t n = this->connection_handler_->peer ().recv (buf, + len, + max_wait_time); + + // Most of the errors handling is common for + // Now the message has been read + if (n == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure \n") + ACE_TEXT ("TAO - recv_i () \n"))); + } + + // Error handling + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + + // Close the connection + this->tms_->connection_closed (); + + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } + + return n; } diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index e5b239b775c..910af84ab26 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -23,7 +23,7 @@ TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void) int TAO_Incoming_Message_Queue::add_message (ACE_Message_Block *incoming, - size_t missing_data, + ssize_t missing_data, CORBA::Octet byte_order) { diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index f5c111f3e5f..9c5515a02d9 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -66,7 +66,8 @@ public: /// The actual message queue ACE_Message_Block *msg_block_; - CORBA::ULong missing_data_; + + CORBA::Long missing_data_; CORBA::Octet byte_order_; @@ -77,6 +78,7 @@ public: TAO_Queued_Data *next_; }; + static TAO_Queued_Data* get_queued_data (void); private: friend class TAO_Transport; @@ -84,8 +86,9 @@ private: /// @@Bala:Docu + TAO_Queued_Data *get_node (void); + - TAO_Queued_Data* get_node (void); int add_node (TAO_Queued_Data *nd); private: diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index 10713dcf732..8057696682d 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -32,9 +32,8 @@ TAO_Incoming_Message_Queue::missing_data (void) const } - ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data * -TAO_Incoming_Message_Queue::get_node (void) +TAO_Incoming_Message_Queue::get_queued_data (void) { // @@TODO: Use the global pool for allocationg... TAO_Queued_Data *qd = 0; @@ -45,6 +44,12 @@ TAO_Incoming_Message_Queue::get_node (void) return qd; } +ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data * +TAO_Incoming_Message_Queue::get_node (void) +{ + return TAO_Incoming_Message_Queue::get_queued_data (); +} + ACE_INLINE TAO_Incoming_Message_Queue::TAO_Queued_Data::TAO_Queued_Data (void) : msg_block_ (0), diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 288d946999e..15801c595f7 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -126,10 +126,13 @@ public: /// @@Bala: Documentation please... virtual int is_message_complete (ACE_Message_Block &message_block) = 0; - virtual size_t missing_data (ACE_Message_Block &incoming) = 0; + virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; virtual CORBA::Octet byte_order (void) = 0; + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *qd) = 0; + /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index cd15fe869d3..be4f08f456a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -738,37 +738,7 @@ TAO_Transport::recv (char *buffer, return -1; // now call the template method - ssize_t n = - this->recv_i (buffer, len, timeout); - - // Most of the errors handling is common for - // Now the message has been read - if (n == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p \n"), - ACE_TEXT ("TAO - read message failure \n") - ACE_TEXT ("TAO - handle_input () \n"))); - } - - // Error handling - if (n == -1) - { - if (errno == EWOULDBLOCK) - return 0; - - // Close the connection - this->tms_->connection_closed (); - - return -1; - } - // @@ What are the other error handling here?? - else if (n == 0) - { - return -1; - } - - return n; + return this->recv_i (buffer, len, timeout); } @@ -867,15 +837,19 @@ TAO_Transport::handle_input_i (ACE_HANDLE h, return -1; // Check whether we have a complete message for processing - size_t missing_data = + ssize_t missing_data = this->missing_data (message_block); - if (missing_data) + if (missing_data < 0) + { + this->consolidate_extra_messages (message_block); + } + else if (missing_data > 0) { - return this->consolidate_message (message_block, - missing_data, - h, - max_wait_time); + return this->consolidate_process_message (message_block, + missing_data, + h, + max_wait_time); } @@ -891,6 +865,9 @@ TAO_Transport::handle_input_i (ACE_HANDLE h, int TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block) { + // @@Bala:What about requests whose headers have been completely + // read in the last read???? + // If we have a queue and if the last message is not complete a // complete one, then this read will get us the remaining data. So // do not try to parse the header if we have an incomplete message @@ -920,9 +897,9 @@ TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block) size_t TAO_Transport::missing_data (ACE_Message_Block &incoming) { - // If we have message in the queue then find out how much of data - // is required to get a complete message - if (this->incoming_message_queue_.queue_length ()) + // If we have a incomplete message in the queue then find out how + // much of data is required to get a complete message + if (!this->incoming_message_queue_.is_complete_message ()) { return this->incoming_message_queue_.missing_data (); } @@ -932,10 +909,10 @@ TAO_Transport::missing_data (ACE_Message_Block &incoming) int -TAO_Transport::consolidate_message (ACE_Message_Block &incoming, - size_t missing_data, - ACE_HANDLE h, - ACE_Time_Value *max_wait_time) +TAO_Transport::consolidate_process_message (ACE_Message_Block &incoming, + ssize_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time) { // The write pointer which will be used for reading data from the // socket. @@ -1000,7 +977,7 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, int TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, - size_t missing_data, + ssize_t missing_data, ACE_HANDLE h, ACE_Time_Value *max_wait_time) { @@ -1050,6 +1027,29 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, return 0; } + + +int +TAO_Transport::consolidate_extra_messages (ACE_Message_Block &incoming) +{ + int retval = 1; + while (retval == 1) + { + TAO_Queued_Data *q_data = 0; + + retval = + this->messaging_object ()->extract_next_message (incoming, + q_data); + if (q_data) + this->incoming_message_queue_.add_node (qd); + } + + if (retval == -1) + return retval; + + return 0; +} + int TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, CORBA::Octet byte_order, diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 789761ee4e0..5ec795a5035 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -574,16 +574,18 @@ protected: int check_message_integrity (ACE_Message_Block &message_block); - int consolidate_message (ACE_Message_Block &incoming, - size_t missing_data, - ACE_HANDLE h, - ACE_Time_Value *max_wait_time); + int consolidate_process_message (ACE_Message_Block &incoming, + ssize_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time); int consolidate_message_queue (ACE_Message_Block &incoming, - size_t missing_data, + ssize_t missing_data, ACE_HANDLE h, ACE_Time_Value *max_wait_time); + void consolidate_extra_messages (ACE_Message_Block &incoming); + /// @@ Bala: Documentation virtual int process_parsed_messages (ACE_Message_Block &message_block, CORBA::Octet byte_order, |