From e495c52789589659c59fcb760544a8a103e58c8f Mon Sep 17 00:00:00 2001 From: Johnny Willemsen Date: Tue, 24 Jul 2007 08:16:40 +0000 Subject: Tue Jul 24 08:17:23 UTC 2007 Johnny Willemsen --- TAO/ChangeLog | 11 ++ TAO/tao/Incoming_Message_Queue.cpp | 275 +---------------------------------- TAO/tao/Incoming_Message_Queue.h | 101 ------------- TAO/tao/Incoming_Message_Queue.inl | 34 ----- TAO/tao/Incoming_Message_Stack.h | 4 +- TAO/tao/Makefile.am | 3 + TAO/tao/Queued_Data.cpp | 284 +++++++++++++++++++++++++++++++++++++ TAO/tao/Queued_Data.h | 133 +++++++++++++++++ TAO/tao/Queued_Data.inl | 33 +++++ TAO/tao/tao.mpc | 2 + 10 files changed, 470 insertions(+), 410 deletions(-) create mode 100644 TAO/tao/Queued_Data.cpp create mode 100644 TAO/tao/Queued_Data.h create mode 100644 TAO/tao/Queued_Data.inl diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 3e7921883f6..f6c01ed29f6 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,14 @@ +Tue Jul 24 08:17:23 UTC 2007 Johnny Willemsen + + * tao/Queued_Data.{h,cpp,inl}: + Moved TAO_Queued_Data to its own file + + * tao/Incoming_Message_Queue.{h,cpp,inl}: + * tao/tao.mpc: + * tao/Makefile.am: + * tao/Incoming_Message_Stack.h: + Updated because of move of TAO_Queued_Data + Tue Jul 24 08:02:23 UTC 2007 Johnny Willemsen * orbsvcs/orbsvcs/IFRService/IFR_Service_Utils_T.cpp: diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 83d1e749059..4b535e4eb4f 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -1,15 +1,14 @@ #include "tao/Incoming_Message_Queue.h" +#include "tao/Queued_Data.h" #include "tao/debug.h" #include "ace/Log_Msg.h" #include "ace/Malloc_Base.h" - #if !defined (__ACE_INLINE__) # include "tao/Incoming_Message_Queue.inl" #endif /* __ACE_INLINE__ */ - ACE_RCSID (tao, Incoming_Message_Queue, "$Id$") @@ -95,277 +94,7 @@ TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) this->last_added_ = nd; } - ++ this->size_; - return 0; -} - - -/************************************************************************/ -// Methods for TAO_Queued_Data -/************************************************************************/ - -/*! - * @brief Allocate and return a new empty message block of size \a span_size - * mimicking parameters of \a mb. - * - * This function allocates a new aligned message block using the same - * allocators and flags as found in \a mb. The size of the new message - * block is at least \a span_size; the size may be adjusted up in order - * to accomodate alignment requirements and still fit \a span_size bytes - * into the aligned buffer. - * - * @param mb message block whose parameters should be mimicked - * @param span_size size of the new message block (will be adjusted for proper - * alignment) - * @return an aligned message block with rd_ptr sitting at correct - * alignment spot, 0 on failure - */ -static ACE_Message_Block* -clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size) -{ - // Calculate the required size of the cloned block with alignment - size_t const aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT); - - // Get the allocators - ACE_Allocator *data_allocator = 0; - ACE_Allocator *data_block_allocator = 0; - ACE_Allocator *message_block_allocator = 0; - mb->access_allocators (data_allocator, - data_block_allocator, - message_block_allocator); - - // Create a new Message Block - ACE_Message_Block *nb = 0; - ACE_NEW_MALLOC_RETURN (nb, - static_cast ( - message_block_allocator->malloc ( - sizeof (ACE_Message_Block))), - ACE_Message_Block(aligned_size, - mb->msg_type(), - mb->cont(), - 0, //we want the data block created - data_allocator, - mb->locking_strategy(), - mb->msg_priority(), - mb->msg_execution_time (), - mb->msg_deadline_time (), - data_block_allocator, - message_block_allocator), - 0); - - ACE_CDR::mb_align (nb); - - // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since - // we just dynamically allocated the two things. - nb->set_flags (mb->flags()); - nb->clr_flags (ACE_Message_Block::DONT_DELETE); - - return nb; -} - -TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc) - : msg_block_ (0), - missing_data_ (0), - major_version_ (0), - minor_version_ (0), - byte_order_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) -{ -} - -TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb, - ACE_Allocator *alloc) - : msg_block_ (mb), - missing_data_ (0), - major_version_ (0), - minor_version_ (0), - byte_order_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) -{ -} - -TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd) - : msg_block_ (qd.msg_block_->duplicate ()), - missing_data_ (qd.missing_data_), - major_version_ (qd.major_version_), - minor_version_ (qd.minor_version_), - byte_order_ (qd.byte_order_), - more_fragments_ (qd.more_fragments_), - msg_type_ (qd.msg_type_), - next_ (0), - allocator_ (qd.allocator_) -{ -} - -/*static*/ -TAO_Queued_Data * -TAO_Queued_Data::make_queued_data (ACE_Allocator *message_buffer_alloc, - ACE_Allocator *input_cdr_alloc, - ACE_Data_Block *db) -{ - // Get a node for the queue.. - TAO_Queued_Data *qd = 0; - - if (message_buffer_alloc) - { - ACE_NEW_MALLOC_RETURN (qd, - static_cast ( - message_buffer_alloc->malloc (sizeof (TAO_Queued_Data))), - TAO_Queued_Data (message_buffer_alloc), - 0); - - } - else - { - // No allocator, so use the global pool! - ACE_NEW_RETURN (qd, - TAO_Queued_Data, - 0); - } - - // Providing an ACE_Data_Block indicates that the caller wants - // an aligned ACE_Message_Block added to the TAO_Queued_Data. - if (db != 0) - { - // If this allocation fails, the TAO_Queued_Data will be leaked. - if (input_cdr_alloc == 0) - ACE_NEW_RETURN (qd->msg_block_, - ACE_Message_Block (db, - 0, - input_cdr_alloc), - 0); - else - ACE_NEW_MALLOC_RETURN (qd->msg_block_, - static_cast ( - input_cdr_alloc->malloc (sizeof (ACE_Message_Block))), - ACE_Message_Block (db, - 0, - input_cdr_alloc), - 0); - - ACE_CDR::mb_align (qd->msg_block_); - } - - return qd; -} - -/*static*/ -void -TAO_Queued_Data::release (TAO_Queued_Data *qd) -{ - //// TODO - ACE_Message_Block::release (qd->msg_block_); - - if (qd->allocator_) - { - ACE_DES_FREE (qd, - qd->allocator_->free, - TAO_Queued_Data); - - return; - } - - // @@todo: Need to be removed at some point of time! - if (TAO_debug_level == 4) - { - // This debug is for testing purposes! - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Queued_Data[%d]::release\n", - "Using global pool for releasing \n")); - } - delete qd; - -} - - -TAO_Queued_Data * -TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd) -{ - // Check to see if the underlying block is on the stack. If not it - // is fine. If the datablock is on stack, try to make a copy of that - // before doing a duplicate. - // @@ todo: Theoretically this should be within the Message Block, - // but we dont have much scope to do this in that mess. Probably in - // the next stage of MB rewrite we should be okay - ACE_Message_Block::Message_Flags fl = - sqd.msg_block_->self_flags (); - - if (ACE_BIT_ENABLED (fl, - ACE_Message_Block::DONT_DELETE)) - (void) TAO_Queued_Data::replace_data_block (*sqd.msg_block_); - - - TAO_Queued_Data *qd = 0; - - if (sqd.allocator_) - { - ACE_NEW_MALLOC_RETURN (qd, - static_cast ( - sqd.allocator_->malloc (sizeof (TAO_Queued_Data))), - TAO_Queued_Data (sqd), - 0); - - return qd; - } - - // No allocator, so use the global pool! - // @@ TODO: We should be removing this at some point of time! - if (TAO_debug_level == 4) - { - // This debug is for testing purposes! - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Queued_Data[%d]::duplicate\n", - "Using global pool for allocation \n")); - } - - ACE_NEW_RETURN (qd, - TAO_Queued_Data (sqd), - 0); - - return qd; -} - -int -TAO_Queued_Data::consolidate (void) -{ - // Is this a chain of fragments? - if (this->more_fragments_ && this->msg_block_->cont () != 0) - { - // Create a message block big enough to hold the entire chain - ACE_Message_Block *dest = clone_mb_nocopy_size ( - this->msg_block_, - this->msg_block_->total_length ()); - - if (0 == dest) - { - // out of memory - return -1; - } - // Memory allocation succeeded, the new message block can hold the consolidated - // message. The following code just copies all the data into this new message block. - // No further memory allocation will take place. - - // Reset the cont() parameter. We have cloned the message - // block but not the chain as we will no longer have chain. - dest->cont (0); - - // Use ACE_CDR to consolidate the chain for us - ACE_CDR::consolidate (dest, this->msg_block_); - - // free the original message block chain - this->msg_block_->release (); - - // Set the message block to the new consolidated message block - this->msg_block_ = dest; - this->more_fragments_ = 0; - } - + ++this->size_; return 0; } diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index a70bfcbbcda..91f257f823c 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -22,10 +22,6 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -ACE_BEGIN_VERSIONED_NAMESPACE_DECL -class ACE_Allocator; -ACE_END_VERSIONED_NAMESPACE_DECL - TAO_BEGIN_VERSIONED_NAMESPACE_DECL class TAO_ORB_Core; @@ -98,103 +94,6 @@ private: /// probably parsing the header failed due to insufficient data in buffer. const size_t TAO_MISSING_DATA_UNDEFINED = ~((size_t) 0); // MAX_SIZE_T -/************************************************************************/ - -/** - * @class TAO_Queued_Data - * - * @brief Represents a node in the queue of incoming messages. - * - * This class contains necessary information about a message that is - * stored in the queue. Such a node can be used by the incoming thread - * from the reactor to dequeue and process the message by sending it - * to the higher layers of the ORB. - * - * The ACE_Message_Block contained within this class may contain a chain - * of message blocks (usually when GIOP fragments are involved). In that - * case consolidate () needs to be called prior to being sent to higher - * layers of the ORB when the GIOP fragment chain is complete. - */ - -class TAO_Export TAO_Queued_Data -{ -public: - /// Default Constructor - TAO_Queued_Data (ACE_Allocator *alloc = 0); - - /// Constructor. - TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0); - - /// Copy constructor. - TAO_Queued_Data (const TAO_Queued_Data &qd); - - /// Creation of a node in the queue. - static TAO_Queued_Data* make_queued_data ( - ACE_Allocator *message_buffer_alloc = 0, - ACE_Allocator *input_cdr_alloc = 0, - ACE_Data_Block *db = 0); - - /// Deletion of a node from the queue. - static void release (TAO_Queued_Data *qd); - - /// Duplicate ourselves. This creates a copy of ourselves on the - /// heap and returns a pointer to the duplicated node. - static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd); - - /// Consolidate this fragments chained message blocks into one. - /// @return -1 if consolidation failed, eg out or memory, otherwise 0 - int consolidate (void); - -public: - - /// The message block that contains the message. - ACE_Message_Block *msg_block_; - - /*! - @name Missing Data details - - The \a missing_data_ member contains the number of bytes of - data missing from \a msg_block_. - */ - //@{ - /*! Data missing in the above message that hasn't been read or processed yet, - the value TAO_MISSING_DATA_UNDEFINED indicates it hasn't been processed yet, - otherwise greater or equal zero. */ - size_t missing_data_; - //@} - - /// Many protocols like GIOP have a major and minor version - /// information that would be needed to read and decipher the - /// message. - CORBA::Octet major_version_; - CORBA::Octet minor_version_; - - /// The byte order of the message that is stored in the node. - CORBA::Octet byte_order_; - - /// Some messages can be fragmented by the protocol (this is an ORB - /// level fragmentation on top of the TCP/IP fragmentation. This - /// member indicates whether the message that we have recd. and - /// queue already has more fragments that is missing.. - CORBA::Octet more_fragments_; - - /// The message type of the message - TAO_Pluggable_Message_Type msg_type_; - - /// Pounter to the next element in the queue. - TAO_Queued_Data *next_; - -private: - /// Replace the datablock with a one allocated on the heap or - /// allocator - static void replace_data_block (ACE_Message_Block &mb); - -private: - - /// The allocator used to allocate this class. - ACE_Allocator *allocator_; -}; - TAO_END_VERSIONED_NAMESPACE_DECL #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl index a5b8a8a06e1..484355fca1e 100644 --- a/TAO/tao/Incoming_Message_Queue.inl +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -4,10 +4,6 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL -/************************************************************************/ -// Methods for TAO_Incoming_Message_Queue -/************************************************************************/ - ACE_INLINE TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core) : last_added_ (0), @@ -22,34 +18,4 @@ TAO_Incoming_Message_Queue::queue_length (void) const return this->size_; } -/************************************************************************/ -// Methods for TAO_Queued_Data -/************************************************************************/ - -/*static*/ -ACE_INLINE void -TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb) -{ - size_t const newsize = - ACE_CDR::total_length (&mb, 0) + ACE_CDR::MAX_ALIGNMENT; - - ACE_Data_Block *db = - mb.data_block ()->clone_nocopy (); - - if (db->size (newsize) == -1) - return; - - ACE_Message_Block tmp (db); - ACE_CDR::mb_align (&tmp); - - tmp.copy (mb.rd_ptr (), mb.length()); - mb.data_block (tmp.data_block ()->duplicate ()); - - mb.rd_ptr (tmp.rd_ptr ()); - mb.wr_ptr (tmp.wr_ptr ()); - - // Remove the DONT_DELETE flags from mb - mb.clr_self_flags (ACE_Message_Block::DONT_DELETE); -} - TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Incoming_Message_Stack.h b/TAO/tao/Incoming_Message_Stack.h index 109e6361ad0..936d2f50a20 100644 --- a/TAO/tao/Incoming_Message_Stack.h +++ b/TAO/tao/Incoming_Message_Stack.h @@ -15,13 +15,13 @@ #include /**/ "ace/pre.h" -#include "tao/Incoming_Message_Queue.h" +#include "tao/Queued_Data.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#if defined (__BORLANDC__) && (__BORLANDC__ <= 0x582) +#if defined (__BORLANDC__) && (__BORLANDC__ <= 0x590) #include /**/ "tao/TAO_Export.h" #endif diff --git a/TAO/tao/Makefile.am b/TAO/tao/Makefile.am index 31fd33614f6..bd00aa2ecd9 100644 --- a/TAO/tao/Makefile.am +++ b/TAO/tao/Makefile.am @@ -880,6 +880,7 @@ libTAO_la_SOURCES = \ Profile_Transport_Resolver.cpp \ Protocol_Factory.cpp \ Protocols_Hooks.cpp \ + Queued_Data.cpp \ Queued_Message.cpp \ Reactive_Connect_Strategy.cpp \ Reactive_Flushing_Strategy.cpp \ @@ -1308,6 +1309,8 @@ nobase_include_HEADERS = \ Pseudo_VarOut_T.cpp \ Pseudo_VarOut_T.h \ Pseudo_VarOut_T.inl \ + Queued_Data.h \ + Queued_Data.inl \ Queued_Message.h \ Range_Checking_T.h \ Reactive_Connect_Strategy.h \ diff --git a/TAO/tao/Queued_Data.cpp b/TAO/tao/Queued_Data.cpp new file mode 100644 index 00000000000..35673d865b1 --- /dev/null +++ b/TAO/tao/Queued_Data.cpp @@ -0,0 +1,284 @@ +#include "tao/Queued_Data.h" +#include "tao/debug.h" + +#include "ace/Log_Msg.h" +#include "ace/Malloc_Base.h" + + +#if !defined (__ACE_INLINE__) +# include "tao/Queued_Data.inl" +#endif /* __ACE_INLINE__ */ + + +ACE_RCSID (tao, + Queued_Data, + "$Id$") + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +/*! + * @brief Allocate and return a new empty message block of size \a span_size + * mimicking parameters of \a mb. + * + * This function allocates a new aligned message block using the same + * allocators and flags as found in \a mb. The size of the new message + * block is at least \a span_size; the size may be adjusted up in order + * to accomodate alignment requirements and still fit \a span_size bytes + * into the aligned buffer. + * + * @param mb message block whose parameters should be mimicked + * @param span_size size of the new message block (will be adjusted for proper + * alignment) + * @return an aligned message block with rd_ptr sitting at correct + * alignment spot, 0 on failure + */ +static ACE_Message_Block* +clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size) +{ + // Calculate the required size of the cloned block with alignment + size_t const aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT); + + // Get the allocators + ACE_Allocator *data_allocator = 0; + ACE_Allocator *data_block_allocator = 0; + ACE_Allocator *message_block_allocator = 0; + mb->access_allocators (data_allocator, + data_block_allocator, + message_block_allocator); + + // Create a new Message Block + ACE_Message_Block *nb = 0; + ACE_NEW_MALLOC_RETURN (nb, + static_cast ( + message_block_allocator->malloc ( + sizeof (ACE_Message_Block))), + ACE_Message_Block(aligned_size, + mb->msg_type(), + mb->cont(), + 0, //we want the data block created + data_allocator, + mb->locking_strategy(), + mb->msg_priority(), + mb->msg_execution_time (), + mb->msg_deadline_time (), + data_block_allocator, + message_block_allocator), + 0); + + ACE_CDR::mb_align (nb); + + // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since + // we just dynamically allocated the two things. + nb->set_flags (mb->flags()); + nb->clr_flags (ACE_Message_Block::DONT_DELETE); + + return nb; +} + +TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc) + : msg_block_ (0), + missing_data_ (0), + major_version_ (0), + minor_version_ (0), + byte_order_ (0), + more_fragments_ (0), + msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), + next_ (0), + allocator_ (alloc) +{ +} + +TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb, + ACE_Allocator *alloc) + : msg_block_ (mb), + missing_data_ (0), + major_version_ (0), + minor_version_ (0), + byte_order_ (0), + more_fragments_ (0), + msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), + next_ (0), + allocator_ (alloc) +{ +} + +TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd) + : msg_block_ (qd.msg_block_->duplicate ()), + missing_data_ (qd.missing_data_), + major_version_ (qd.major_version_), + minor_version_ (qd.minor_version_), + byte_order_ (qd.byte_order_), + more_fragments_ (qd.more_fragments_), + msg_type_ (qd.msg_type_), + next_ (0), + allocator_ (qd.allocator_) +{ +} + +/*static*/ +TAO_Queued_Data * +TAO_Queued_Data::make_queued_data (ACE_Allocator *message_buffer_alloc, + ACE_Allocator *input_cdr_alloc, + ACE_Data_Block *db) +{ + // Get a node for the queue.. + TAO_Queued_Data *qd = 0; + + if (message_buffer_alloc) + { + ACE_NEW_MALLOC_RETURN (qd, + static_cast ( + message_buffer_alloc->malloc (sizeof (TAO_Queued_Data))), + TAO_Queued_Data (message_buffer_alloc), + 0); + + } + else + { + // No allocator, so use the global pool! + ACE_NEW_RETURN (qd, + TAO_Queued_Data, + 0); + } + + // Providing an ACE_Data_Block indicates that the caller wants + // an aligned ACE_Message_Block added to the TAO_Queued_Data. + if (db != 0) + { + // If this allocation fails, the TAO_Queued_Data will be leaked. + if (input_cdr_alloc == 0) + ACE_NEW_RETURN (qd->msg_block_, + ACE_Message_Block (db, + 0, + input_cdr_alloc), + 0); + else + ACE_NEW_MALLOC_RETURN (qd->msg_block_, + static_cast ( + input_cdr_alloc->malloc (sizeof (ACE_Message_Block))), + ACE_Message_Block (db, + 0, + input_cdr_alloc), + 0); + + ACE_CDR::mb_align (qd->msg_block_); + } + + return qd; +} + +/*static*/ +void +TAO_Queued_Data::release (TAO_Queued_Data *qd) +{ + //// TODO + ACE_Message_Block::release (qd->msg_block_); + + if (qd->allocator_) + { + ACE_DES_FREE (qd, + qd->allocator_->free, + TAO_Queued_Data); + + return; + } + + // @@todo: Need to be removed at some point of time! + if (TAO_debug_level == 4) + { + // This debug is for testing purposes! + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Queued_Data[%d]::release\n", + "Using global pool for releasing \n")); + } + delete qd; + +} + + +TAO_Queued_Data * +TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd) +{ + // Check to see if the underlying block is on the stack. If not it + // is fine. If the datablock is on stack, try to make a copy of that + // before doing a duplicate. + // @@ todo: Theoretically this should be within the Message Block, + // but we dont have much scope to do this in that mess. Probably in + // the next stage of MB rewrite we should be okay + ACE_Message_Block::Message_Flags fl = + sqd.msg_block_->self_flags (); + + if (ACE_BIT_ENABLED (fl, + ACE_Message_Block::DONT_DELETE)) + (void) TAO_Queued_Data::replace_data_block (*sqd.msg_block_); + + + TAO_Queued_Data *qd = 0; + + if (sqd.allocator_) + { + ACE_NEW_MALLOC_RETURN (qd, + static_cast ( + sqd.allocator_->malloc (sizeof (TAO_Queued_Data))), + TAO_Queued_Data (sqd), + 0); + + return qd; + } + + // No allocator, so use the global pool! + // @@ TODO: We should be removing this at some point of time! + if (TAO_debug_level == 4) + { + // This debug is for testing purposes! + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Queued_Data[%d]::duplicate\n", + "Using global pool for allocation \n")); + } + + ACE_NEW_RETURN (qd, + TAO_Queued_Data (sqd), + 0); + + return qd; +} + +int +TAO_Queued_Data::consolidate (void) +{ + // Is this a chain of fragments? + if (this->more_fragments_ && this->msg_block_->cont () != 0) + { + // Create a message block big enough to hold the entire chain + ACE_Message_Block *dest = clone_mb_nocopy_size ( + this->msg_block_, + this->msg_block_->total_length ()); + + if (0 == dest) + { + // out of memory + return -1; + } + // Memory allocation succeeded, the new message block can hold the consolidated + // message. The following code just copies all the data into this new message block. + // No further memory allocation will take place. + + // Reset the cont() parameter. We have cloned the message + // block but not the chain as we will no longer have chain. + dest->cont (0); + + // Use ACE_CDR to consolidate the chain for us + ACE_CDR::consolidate (dest, this->msg_block_); + + // free the original message block chain + this->msg_block_->release (); + + // Set the message block to the new consolidated message block + this->msg_block_ = dest; + this->more_fragments_ = 0; + } + + return 0; +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Queued_Data.h b/TAO/tao/Queued_Data.h new file mode 100644 index 00000000000..3376cf48bff --- /dev/null +++ b/TAO/tao/Queued_Data.h @@ -0,0 +1,133 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Queued_Data.h + * + * $Id$ + * + * @author Balachandran Natarajan + */ +//============================================================================= + +#ifndef TAO_QUEUED_DATA_H +#define TAO_QUEUED_DATA_H + +#include /**/ "ace/pre.h" + +#include "tao/Pluggable_Messaging_Utils.h" +#include "ace/Message_Block.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL +class ACE_Allocator; +ACE_END_VERSIONED_NAMESPACE_DECL + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +/** + * @class TAO_Queued_Data + * + * @brief Represents a node in the queue of incoming messages. + * + * This class contains necessary information about a message that is + * stored in the queue. Such a node can be used by the incoming thread + * from the reactor to dequeue and process the message by sending it + * to the higher layers of the ORB. + * + * The ACE_Message_Block contained within this class may contain a chain + * of message blocks (usually when GIOP fragments are involved). In that + * case consolidate () needs to be called prior to being sent to higher + * layers of the ORB when the GIOP fragment chain is complete. + */ + +class TAO_Export TAO_Queued_Data +{ +public: + /// Default Constructor + TAO_Queued_Data (ACE_Allocator *alloc = 0); + + /// Constructor. + TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0); + + /// Copy constructor. + TAO_Queued_Data (const TAO_Queued_Data &qd); + + /// Creation of a node in the queue. + static TAO_Queued_Data* make_queued_data ( + ACE_Allocator *message_buffer_alloc = 0, + ACE_Allocator *input_cdr_alloc = 0, + ACE_Data_Block *db = 0); + + /// Deletion of a node from the queue. + static void release (TAO_Queued_Data *qd); + + /// Duplicate ourselves. This creates a copy of ourselves on the + /// heap and returns a pointer to the duplicated node. + static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd); + + /// Consolidate this fragments chained message blocks into one. + /// @return -1 if consolidation failed, eg out or memory, otherwise 0 + int consolidate (void); + +public: + + /// The message block that contains the message. + ACE_Message_Block *msg_block_; + + /*! + @name Missing Data details + + The \a missing_data_ member contains the number of bytes of + data missing from \a msg_block_. + */ + //@{ + /*! Data missing in the above message that hasn't been read or processed yet, + the value TAO_MISSING_DATA_UNDEFINED indicates it hasn't been processed yet, + otherwise greater or equal zero. */ + size_t missing_data_; + //@} + + /// Many protocols like GIOP have a major and minor version + /// information that would be needed to read and decipher the + /// message. + CORBA::Octet major_version_; + CORBA::Octet minor_version_; + + /// The byte order of the message that is stored in the node. + CORBA::Octet byte_order_; + + /// Some messages can be fragmented by the protocol (this is an ORB + /// level fragmentation on top of the TCP/IP fragmentation. This + /// member indicates whether the message that we have recd. and + /// queue already has more fragments that is missing.. + CORBA::Octet more_fragments_; + + /// The message type of the message + TAO_Pluggable_Message_Type msg_type_; + + /// Pounter to the next element in the queue. + TAO_Queued_Data *next_; + +private: + /// Replace the datablock with a one allocated on the heap or + /// allocator + static void replace_data_block (ACE_Message_Block &mb); + +private: + + /// The allocator used to allocate this class. + ACE_Allocator *allocator_; +}; + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/Queued_Data.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /*TAO_QUEUED_DATA_H*/ diff --git a/TAO/tao/Queued_Data.inl b/TAO/tao/Queued_Data.inl new file mode 100644 index 00000000000..67b3cd89f4c --- /dev/null +++ b/TAO/tao/Queued_Data.inl @@ -0,0 +1,33 @@ +// -*- C++ -*- +// +//$Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +/*static*/ +ACE_INLINE void +TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb) +{ + size_t const newsize = + ACE_CDR::total_length (&mb, 0) + ACE_CDR::MAX_ALIGNMENT; + + ACE_Data_Block *db = + mb.data_block ()->clone_nocopy (); + + if (db->size (newsize) == -1) + return; + + ACE_Message_Block tmp (db); + ACE_CDR::mb_align (&tmp); + + tmp.copy (mb.rd_ptr (), mb.length()); + mb.data_block (tmp.data_block ()->duplicate ()); + + mb.rd_ptr (tmp.rd_ptr ()); + mb.wr_ptr (tmp.wr_ptr ()); + + // Remove the DONT_DELETE flags from mb + mb.clr_self_flags (ACE_Message_Block::DONT_DELETE); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/tao.mpc b/TAO/tao/tao.mpc index adb3f9f0f4b..673fcec795e 100644 --- a/TAO/tao/tao.mpc +++ b/TAO/tao/tao.mpc @@ -176,6 +176,7 @@ project(TAO) : acelib, install, tao_output, taodefaults, pidl, extra_core, tao_v Protocol_Factory.cpp Protocols_Hooks.cpp Network_Priority_Protocols_Hooks.cpp + Queued_Data.cpp Queued_Message.cpp Reactive_Connect_Strategy.cpp Reactive_Flushing_Strategy.cpp @@ -468,6 +469,7 @@ project(TAO) : acelib, install, tao_output, taodefaults, pidl, extra_core, tao_v Network_Priority_Protocols_Hooks.h Pseudo_VarOut_T.h Queued_Message.h + Queued_Data.h Range_Checking_T.h Reactive_Connect_Strategy.h Reactive_Flushing_Strategy.h -- cgit v1.2.1