summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohnny Willemsen <jwillemsen@remedy.nl>2007-07-24 08:16:40 +0000
committerJohnny Willemsen <jwillemsen@remedy.nl>2007-07-24 08:16:40 +0000
commite495c52789589659c59fcb760544a8a103e58c8f (patch)
tree8a01709cf37e6cf8aba91a1b97b739d04de4a660
parent55d4db4222d71ec9c930ff4f3c5e401d59e09d41 (diff)
downloadATCD-e495c52789589659c59fcb760544a8a103e58c8f.tar.gz
Tue Jul 24 08:17:23 UTC 2007 Johnny Willemsen <jwillemsen@remedy.nl>
-rw-r--r--TAO/ChangeLog11
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp275
-rw-r--r--TAO/tao/Incoming_Message_Queue.h101
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl34
-rw-r--r--TAO/tao/Incoming_Message_Stack.h4
-rw-r--r--TAO/tao/Makefile.am3
-rw-r--r--TAO/tao/Queued_Data.cpp284
-rw-r--r--TAO/tao/Queued_Data.h133
-rw-r--r--TAO/tao/Queued_Data.inl33
-rw-r--r--TAO/tao/tao.mpc2
10 files changed, 470 insertions, 410 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 3e7921883f6..f6c01ed29f6 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,14 @@
+Tue Jul 24 08:17:23 UTC 2007 Johnny Willemsen <jwillemsen@remedy.nl>
+
+ * tao/Queued_Data.{h,cpp,inl}:
+ Moved TAO_Queued_Data to its own file
+
+ * tao/Incoming_Message_Queue.{h,cpp,inl}:
+ * tao/tao.mpc:
+ * tao/Makefile.am:
+ * tao/Incoming_Message_Stack.h:
+ Updated because of move of TAO_Queued_Data
+
Tue Jul 24 08:02:23 UTC 2007 Johnny Willemsen <jwillemsen@remedy.nl>
* orbsvcs/orbsvcs/IFRService/IFR_Service_Utils_T.cpp:
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 83d1e749059..4b535e4eb4f 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -1,15 +1,14 @@
#include "tao/Incoming_Message_Queue.h"
+#include "tao/Queued_Data.h"
#include "tao/debug.h"
#include "ace/Log_Msg.h"
#include "ace/Malloc_Base.h"
-
#if !defined (__ACE_INLINE__)
# include "tao/Incoming_Message_Queue.inl"
#endif /* __ACE_INLINE__ */
-
ACE_RCSID (tao,
Incoming_Message_Queue,
"$Id$")
@@ -95,277 +94,7 @@ TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
this->last_added_ = nd;
}
- ++ this->size_;
- return 0;
-}
-
-
-/************************************************************************/
-// Methods for TAO_Queued_Data
-/************************************************************************/
-
-/*!
- * @brief Allocate and return a new empty message block of size \a span_size
- * mimicking parameters of \a mb.
- *
- * This function allocates a new aligned message block using the same
- * allocators and flags as found in \a mb. The size of the new message
- * block is at least \a span_size; the size may be adjusted up in order
- * to accomodate alignment requirements and still fit \a span_size bytes
- * into the aligned buffer.
- *
- * @param mb message block whose parameters should be mimicked
- * @param span_size size of the new message block (will be adjusted for proper
- * alignment)
- * @return an aligned message block with rd_ptr sitting at correct
- * alignment spot, 0 on failure
- */
-static ACE_Message_Block*
-clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
-{
- // Calculate the required size of the cloned block with alignment
- size_t const aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
-
- // Get the allocators
- ACE_Allocator *data_allocator = 0;
- ACE_Allocator *data_block_allocator = 0;
- ACE_Allocator *message_block_allocator = 0;
- mb->access_allocators (data_allocator,
- data_block_allocator,
- message_block_allocator);
-
- // Create a new Message Block
- ACE_Message_Block *nb = 0;
- ACE_NEW_MALLOC_RETURN (nb,
- static_cast<ACE_Message_Block*> (
- message_block_allocator->malloc (
- sizeof (ACE_Message_Block))),
- ACE_Message_Block(aligned_size,
- mb->msg_type(),
- mb->cont(),
- 0, //we want the data block created
- data_allocator,
- mb->locking_strategy(),
- mb->msg_priority(),
- mb->msg_execution_time (),
- mb->msg_deadline_time (),
- data_block_allocator,
- message_block_allocator),
- 0);
-
- ACE_CDR::mb_align (nb);
-
- // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
- // we just dynamically allocated the two things.
- nb->set_flags (mb->flags());
- nb->clr_flags (ACE_Message_Block::DONT_DELETE);
-
- return nb;
-}
-
-TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
- : msg_block_ (0),
- missing_data_ (0),
- major_version_ (0),
- minor_version_ (0),
- byte_order_ (0),
- more_fragments_ (0),
- msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
- next_ (0),
- allocator_ (alloc)
-{
-}
-
-TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
- ACE_Allocator *alloc)
- : msg_block_ (mb),
- missing_data_ (0),
- major_version_ (0),
- minor_version_ (0),
- byte_order_ (0),
- more_fragments_ (0),
- msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
- next_ (0),
- allocator_ (alloc)
-{
-}
-
-TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
- : msg_block_ (qd.msg_block_->duplicate ()),
- missing_data_ (qd.missing_data_),
- major_version_ (qd.major_version_),
- minor_version_ (qd.minor_version_),
- byte_order_ (qd.byte_order_),
- more_fragments_ (qd.more_fragments_),
- msg_type_ (qd.msg_type_),
- next_ (0),
- allocator_ (qd.allocator_)
-{
-}
-
-/*static*/
-TAO_Queued_Data *
-TAO_Queued_Data::make_queued_data (ACE_Allocator *message_buffer_alloc,
- ACE_Allocator *input_cdr_alloc,
- ACE_Data_Block *db)
-{
- // Get a node for the queue..
- TAO_Queued_Data *qd = 0;
-
- if (message_buffer_alloc)
- {
- ACE_NEW_MALLOC_RETURN (qd,
- static_cast<TAO_Queued_Data *> (
- message_buffer_alloc->malloc (sizeof (TAO_Queued_Data))),
- TAO_Queued_Data (message_buffer_alloc),
- 0);
-
- }
- else
- {
- // No allocator, so use the global pool!
- ACE_NEW_RETURN (qd,
- TAO_Queued_Data,
- 0);
- }
-
- // Providing an ACE_Data_Block indicates that the caller wants
- // an aligned ACE_Message_Block added to the TAO_Queued_Data.
- if (db != 0)
- {
- // If this allocation fails, the TAO_Queued_Data will be leaked.
- if (input_cdr_alloc == 0)
- ACE_NEW_RETURN (qd->msg_block_,
- ACE_Message_Block (db,
- 0,
- input_cdr_alloc),
- 0);
- else
- ACE_NEW_MALLOC_RETURN (qd->msg_block_,
- static_cast<ACE_Message_Block*> (
- input_cdr_alloc->malloc (sizeof (ACE_Message_Block))),
- ACE_Message_Block (db,
- 0,
- input_cdr_alloc),
- 0);
-
- ACE_CDR::mb_align (qd->msg_block_);
- }
-
- return qd;
-}
-
-/*static*/
-void
-TAO_Queued_Data::release (TAO_Queued_Data *qd)
-{
- //// TODO
- ACE_Message_Block::release (qd->msg_block_);
-
- if (qd->allocator_)
- {
- ACE_DES_FREE (qd,
- qd->allocator_->free,
- TAO_Queued_Data);
-
- return;
- }
-
- // @@todo: Need to be removed at some point of time!
- if (TAO_debug_level == 4)
- {
- // This debug is for testing purposes!
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Queued_Data[%d]::release\n",
- "Using global pool for releasing \n"));
- }
- delete qd;
-
-}
-
-
-TAO_Queued_Data *
-TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
-{
- // Check to see if the underlying block is on the stack. If not it
- // is fine. If the datablock is on stack, try to make a copy of that
- // before doing a duplicate.
- // @@ todo: Theoretically this should be within the Message Block,
- // but we dont have much scope to do this in that mess. Probably in
- // the next stage of MB rewrite we should be okay
- ACE_Message_Block::Message_Flags fl =
- sqd.msg_block_->self_flags ();
-
- if (ACE_BIT_ENABLED (fl,
- ACE_Message_Block::DONT_DELETE))
- (void) TAO_Queued_Data::replace_data_block (*sqd.msg_block_);
-
-
- TAO_Queued_Data *qd = 0;
-
- if (sqd.allocator_)
- {
- ACE_NEW_MALLOC_RETURN (qd,
- static_cast<TAO_Queued_Data *> (
- sqd.allocator_->malloc (sizeof (TAO_Queued_Data))),
- TAO_Queued_Data (sqd),
- 0);
-
- return qd;
- }
-
- // No allocator, so use the global pool!
- // @@ TODO: We should be removing this at some point of time!
- if (TAO_debug_level == 4)
- {
- // This debug is for testing purposes!
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Queued_Data[%d]::duplicate\n",
- "Using global pool for allocation \n"));
- }
-
- ACE_NEW_RETURN (qd,
- TAO_Queued_Data (sqd),
- 0);
-
- return qd;
-}
-
-int
-TAO_Queued_Data::consolidate (void)
-{
- // Is this a chain of fragments?
- if (this->more_fragments_ && this->msg_block_->cont () != 0)
- {
- // Create a message block big enough to hold the entire chain
- ACE_Message_Block *dest = clone_mb_nocopy_size (
- this->msg_block_,
- this->msg_block_->total_length ());
-
- if (0 == dest)
- {
- // out of memory
- return -1;
- }
- // Memory allocation succeeded, the new message block can hold the consolidated
- // message. The following code just copies all the data into this new message block.
- // No further memory allocation will take place.
-
- // Reset the cont() parameter. We have cloned the message
- // block but not the chain as we will no longer have chain.
- dest->cont (0);
-
- // Use ACE_CDR to consolidate the chain for us
- ACE_CDR::consolidate (dest, this->msg_block_);
-
- // free the original message block chain
- this->msg_block_->release ();
-
- // Set the message block to the new consolidated message block
- this->msg_block_ = dest;
- this->more_fragments_ = 0;
- }
-
+ ++this->size_;
return 0;
}
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index a70bfcbbcda..91f257f823c 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -22,10 +22,6 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-ACE_BEGIN_VERSIONED_NAMESPACE_DECL
-class ACE_Allocator;
-ACE_END_VERSIONED_NAMESPACE_DECL
-
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
class TAO_ORB_Core;
@@ -98,103 +94,6 @@ private:
/// probably parsing the header failed due to insufficient data in buffer.
const size_t TAO_MISSING_DATA_UNDEFINED = ~((size_t) 0); // MAX_SIZE_T
-/************************************************************************/
-
-/**
- * @class TAO_Queued_Data
- *
- * @brief Represents a node in the queue of incoming messages.
- *
- * This class contains necessary information about a message that is
- * stored in the queue. Such a node can be used by the incoming thread
- * from the reactor to dequeue and process the message by sending it
- * to the higher layers of the ORB.
- *
- * The ACE_Message_Block contained within this class may contain a chain
- * of message blocks (usually when GIOP fragments are involved). In that
- * case consolidate () needs to be called prior to being sent to higher
- * layers of the ORB when the GIOP fragment chain is complete.
- */
-
-class TAO_Export TAO_Queued_Data
-{
-public:
- /// Default Constructor
- TAO_Queued_Data (ACE_Allocator *alloc = 0);
-
- /// Constructor.
- TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0);
-
- /// Copy constructor.
- TAO_Queued_Data (const TAO_Queued_Data &qd);
-
- /// Creation of a node in the queue.
- static TAO_Queued_Data* make_queued_data (
- ACE_Allocator *message_buffer_alloc = 0,
- ACE_Allocator *input_cdr_alloc = 0,
- ACE_Data_Block *db = 0);
-
- /// Deletion of a node from the queue.
- static void release (TAO_Queued_Data *qd);
-
- /// Duplicate ourselves. This creates a copy of ourselves on the
- /// heap and returns a pointer to the duplicated node.
- static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd);
-
- /// Consolidate this fragments chained message blocks into one.
- /// @return -1 if consolidation failed, eg out or memory, otherwise 0
- int consolidate (void);
-
-public:
-
- /// The message block that contains the message.
- ACE_Message_Block *msg_block_;
-
- /*!
- @name Missing Data details
-
- The \a missing_data_ member contains the number of bytes of
- data missing from \a msg_block_.
- */
- //@{
- /*! Data missing in the above message that hasn't been read or processed yet,
- the value TAO_MISSING_DATA_UNDEFINED indicates it hasn't been processed yet,
- otherwise greater or equal zero. */
- size_t missing_data_;
- //@}
-
- /// Many protocols like GIOP have a major and minor version
- /// information that would be needed to read and decipher the
- /// message.
- CORBA::Octet major_version_;
- CORBA::Octet minor_version_;
-
- /// The byte order of the message that is stored in the node.
- CORBA::Octet byte_order_;
-
- /// Some messages can be fragmented by the protocol (this is an ORB
- /// level fragmentation on top of the TCP/IP fragmentation. This
- /// member indicates whether the message that we have recd. and
- /// queue already has more fragments that is missing..
- CORBA::Octet more_fragments_;
-
- /// The message type of the message
- TAO_Pluggable_Message_Type msg_type_;
-
- /// Pounter to the next element in the queue.
- TAO_Queued_Data *next_;
-
-private:
- /// Replace the datablock with a one allocated on the heap or
- /// allocator
- static void replace_data_block (ACE_Message_Block &mb);
-
-private:
-
- /// The allocator used to allocate this class.
- ACE_Allocator *allocator_;
-};
-
TAO_END_VERSIONED_NAMESPACE_DECL
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index a5b8a8a06e1..484355fca1e 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -4,10 +4,6 @@
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-/************************************************************************/
-// Methods for TAO_Incoming_Message_Queue
-/************************************************************************/
-
ACE_INLINE
TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
: last_added_ (0),
@@ -22,34 +18,4 @@ TAO_Incoming_Message_Queue::queue_length (void) const
return this->size_;
}
-/************************************************************************/
-// Methods for TAO_Queued_Data
-/************************************************************************/
-
-/*static*/
-ACE_INLINE void
-TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb)
-{
- size_t const newsize =
- ACE_CDR::total_length (&mb, 0) + ACE_CDR::MAX_ALIGNMENT;
-
- ACE_Data_Block *db =
- mb.data_block ()->clone_nocopy ();
-
- if (db->size (newsize) == -1)
- return;
-
- ACE_Message_Block tmp (db);
- ACE_CDR::mb_align (&tmp);
-
- tmp.copy (mb.rd_ptr (), mb.length());
- mb.data_block (tmp.data_block ()->duplicate ());
-
- mb.rd_ptr (tmp.rd_ptr ());
- mb.wr_ptr (tmp.wr_ptr ());
-
- // Remove the DONT_DELETE flags from mb
- mb.clr_self_flags (ACE_Message_Block::DONT_DELETE);
-}
-
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Incoming_Message_Stack.h b/TAO/tao/Incoming_Message_Stack.h
index 109e6361ad0..936d2f50a20 100644
--- a/TAO/tao/Incoming_Message_Stack.h
+++ b/TAO/tao/Incoming_Message_Stack.h
@@ -15,13 +15,13 @@
#include /**/ "ace/pre.h"
-#include "tao/Incoming_Message_Queue.h"
+#include "tao/Queued_Data.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#if defined (__BORLANDC__) && (__BORLANDC__ <= 0x582)
+#if defined (__BORLANDC__) && (__BORLANDC__ <= 0x590)
#include /**/ "tao/TAO_Export.h"
#endif
diff --git a/TAO/tao/Makefile.am b/TAO/tao/Makefile.am
index 31fd33614f6..bd00aa2ecd9 100644
--- a/TAO/tao/Makefile.am
+++ b/TAO/tao/Makefile.am
@@ -880,6 +880,7 @@ libTAO_la_SOURCES = \
Profile_Transport_Resolver.cpp \
Protocol_Factory.cpp \
Protocols_Hooks.cpp \
+ Queued_Data.cpp \
Queued_Message.cpp \
Reactive_Connect_Strategy.cpp \
Reactive_Flushing_Strategy.cpp \
@@ -1308,6 +1309,8 @@ nobase_include_HEADERS = \
Pseudo_VarOut_T.cpp \
Pseudo_VarOut_T.h \
Pseudo_VarOut_T.inl \
+ Queued_Data.h \
+ Queued_Data.inl \
Queued_Message.h \
Range_Checking_T.h \
Reactive_Connect_Strategy.h \
diff --git a/TAO/tao/Queued_Data.cpp b/TAO/tao/Queued_Data.cpp
new file mode 100644
index 00000000000..35673d865b1
--- /dev/null
+++ b/TAO/tao/Queued_Data.cpp
@@ -0,0 +1,284 @@
+#include "tao/Queued_Data.h"
+#include "tao/debug.h"
+
+#include "ace/Log_Msg.h"
+#include "ace/Malloc_Base.h"
+
+
+#if !defined (__ACE_INLINE__)
+# include "tao/Queued_Data.inl"
+#endif /* __ACE_INLINE__ */
+
+
+ACE_RCSID (tao,
+ Queued_Data,
+ "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+/*!
+ * @brief Allocate and return a new empty message block of size \a span_size
+ * mimicking parameters of \a mb.
+ *
+ * This function allocates a new aligned message block using the same
+ * allocators and flags as found in \a mb. The size of the new message
+ * block is at least \a span_size; the size may be adjusted up in order
+ * to accomodate alignment requirements and still fit \a span_size bytes
+ * into the aligned buffer.
+ *
+ * @param mb message block whose parameters should be mimicked
+ * @param span_size size of the new message block (will be adjusted for proper
+ * alignment)
+ * @return an aligned message block with rd_ptr sitting at correct
+ * alignment spot, 0 on failure
+ */
+static ACE_Message_Block*
+clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
+{
+ // Calculate the required size of the cloned block with alignment
+ size_t const aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
+
+ // Get the allocators
+ ACE_Allocator *data_allocator = 0;
+ ACE_Allocator *data_block_allocator = 0;
+ ACE_Allocator *message_block_allocator = 0;
+ mb->access_allocators (data_allocator,
+ data_block_allocator,
+ message_block_allocator);
+
+ // Create a new Message Block
+ ACE_Message_Block *nb = 0;
+ ACE_NEW_MALLOC_RETURN (nb,
+ static_cast<ACE_Message_Block*> (
+ message_block_allocator->malloc (
+ sizeof (ACE_Message_Block))),
+ ACE_Message_Block(aligned_size,
+ mb->msg_type(),
+ mb->cont(),
+ 0, //we want the data block created
+ data_allocator,
+ mb->locking_strategy(),
+ mb->msg_priority(),
+ mb->msg_execution_time (),
+ mb->msg_deadline_time (),
+ data_block_allocator,
+ message_block_allocator),
+ 0);
+
+ ACE_CDR::mb_align (nb);
+
+ // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
+ // we just dynamically allocated the two things.
+ nb->set_flags (mb->flags());
+ nb->clr_flags (ACE_Message_Block::DONT_DELETE);
+
+ return nb;
+}
+
+TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
+ : msg_block_ (0),
+ missing_data_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ byte_order_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
+{
+}
+
+TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
+ ACE_Allocator *alloc)
+ : msg_block_ (mb),
+ missing_data_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ byte_order_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
+{
+}
+
+TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
+ : msg_block_ (qd.msg_block_->duplicate ()),
+ missing_data_ (qd.missing_data_),
+ major_version_ (qd.major_version_),
+ minor_version_ (qd.minor_version_),
+ byte_order_ (qd.byte_order_),
+ more_fragments_ (qd.more_fragments_),
+ msg_type_ (qd.msg_type_),
+ next_ (0),
+ allocator_ (qd.allocator_)
+{
+}
+
+/*static*/
+TAO_Queued_Data *
+TAO_Queued_Data::make_queued_data (ACE_Allocator *message_buffer_alloc,
+ ACE_Allocator *input_cdr_alloc,
+ ACE_Data_Block *db)
+{
+ // Get a node for the queue..
+ TAO_Queued_Data *qd = 0;
+
+ if (message_buffer_alloc)
+ {
+ ACE_NEW_MALLOC_RETURN (qd,
+ static_cast<TAO_Queued_Data *> (
+ message_buffer_alloc->malloc (sizeof (TAO_Queued_Data))),
+ TAO_Queued_Data (message_buffer_alloc),
+ 0);
+
+ }
+ else
+ {
+ // No allocator, so use the global pool!
+ ACE_NEW_RETURN (qd,
+ TAO_Queued_Data,
+ 0);
+ }
+
+ // Providing an ACE_Data_Block indicates that the caller wants
+ // an aligned ACE_Message_Block added to the TAO_Queued_Data.
+ if (db != 0)
+ {
+ // If this allocation fails, the TAO_Queued_Data will be leaked.
+ if (input_cdr_alloc == 0)
+ ACE_NEW_RETURN (qd->msg_block_,
+ ACE_Message_Block (db,
+ 0,
+ input_cdr_alloc),
+ 0);
+ else
+ ACE_NEW_MALLOC_RETURN (qd->msg_block_,
+ static_cast<ACE_Message_Block*> (
+ input_cdr_alloc->malloc (sizeof (ACE_Message_Block))),
+ ACE_Message_Block (db,
+ 0,
+ input_cdr_alloc),
+ 0);
+
+ ACE_CDR::mb_align (qd->msg_block_);
+ }
+
+ return qd;
+}
+
+/*static*/
+void
+TAO_Queued_Data::release (TAO_Queued_Data *qd)
+{
+ //// TODO
+ ACE_Message_Block::release (qd->msg_block_);
+
+ if (qd->allocator_)
+ {
+ ACE_DES_FREE (qd,
+ qd->allocator_->free,
+ TAO_Queued_Data);
+
+ return;
+ }
+
+ // @@todo: Need to be removed at some point of time!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Queued_Data[%d]::release\n",
+ "Using global pool for releasing \n"));
+ }
+ delete qd;
+
+}
+
+
+TAO_Queued_Data *
+TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
+{
+ // Check to see if the underlying block is on the stack. If not it
+ // is fine. If the datablock is on stack, try to make a copy of that
+ // before doing a duplicate.
+ // @@ todo: Theoretically this should be within the Message Block,
+ // but we dont have much scope to do this in that mess. Probably in
+ // the next stage of MB rewrite we should be okay
+ ACE_Message_Block::Message_Flags fl =
+ sqd.msg_block_->self_flags ();
+
+ if (ACE_BIT_ENABLED (fl,
+ ACE_Message_Block::DONT_DELETE))
+ (void) TAO_Queued_Data::replace_data_block (*sqd.msg_block_);
+
+
+ TAO_Queued_Data *qd = 0;
+
+ if (sqd.allocator_)
+ {
+ ACE_NEW_MALLOC_RETURN (qd,
+ static_cast<TAO_Queued_Data *> (
+ sqd.allocator_->malloc (sizeof (TAO_Queued_Data))),
+ TAO_Queued_Data (sqd),
+ 0);
+
+ return qd;
+ }
+
+ // No allocator, so use the global pool!
+ // @@ TODO: We should be removing this at some point of time!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Queued_Data[%d]::duplicate\n",
+ "Using global pool for allocation \n"));
+ }
+
+ ACE_NEW_RETURN (qd,
+ TAO_Queued_Data (sqd),
+ 0);
+
+ return qd;
+}
+
+int
+TAO_Queued_Data::consolidate (void)
+{
+ // Is this a chain of fragments?
+ if (this->more_fragments_ && this->msg_block_->cont () != 0)
+ {
+ // Create a message block big enough to hold the entire chain
+ ACE_Message_Block *dest = clone_mb_nocopy_size (
+ this->msg_block_,
+ this->msg_block_->total_length ());
+
+ if (0 == dest)
+ {
+ // out of memory
+ return -1;
+ }
+ // Memory allocation succeeded, the new message block can hold the consolidated
+ // message. The following code just copies all the data into this new message block.
+ // No further memory allocation will take place.
+
+ // Reset the cont() parameter. We have cloned the message
+ // block but not the chain as we will no longer have chain.
+ dest->cont (0);
+
+ // Use ACE_CDR to consolidate the chain for us
+ ACE_CDR::consolidate (dest, this->msg_block_);
+
+ // free the original message block chain
+ this->msg_block_->release ();
+
+ // Set the message block to the new consolidated message block
+ this->msg_block_ = dest;
+ this->more_fragments_ = 0;
+ }
+
+ return 0;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Queued_Data.h b/TAO/tao/Queued_Data.h
new file mode 100644
index 00000000000..3376cf48bff
--- /dev/null
+++ b/TAO/tao/Queued_Data.h
@@ -0,0 +1,133 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Queued_Data.h
+ *
+ * $Id$
+ *
+ * @author Balachandran Natarajan <bala@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_QUEUED_DATA_H
+#define TAO_QUEUED_DATA_H
+
+#include /**/ "ace/pre.h"
+
+#include "tao/Pluggable_Messaging_Utils.h"
+#include "ace/Message_Block.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+class ACE_Allocator;
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+/**
+ * @class TAO_Queued_Data
+ *
+ * @brief Represents a node in the queue of incoming messages.
+ *
+ * This class contains necessary information about a message that is
+ * stored in the queue. Such a node can be used by the incoming thread
+ * from the reactor to dequeue and process the message by sending it
+ * to the higher layers of the ORB.
+ *
+ * The ACE_Message_Block contained within this class may contain a chain
+ * of message blocks (usually when GIOP fragments are involved). In that
+ * case consolidate () needs to be called prior to being sent to higher
+ * layers of the ORB when the GIOP fragment chain is complete.
+ */
+
+class TAO_Export TAO_Queued_Data
+{
+public:
+ /// Default Constructor
+ TAO_Queued_Data (ACE_Allocator *alloc = 0);
+
+ /// Constructor.
+ TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0);
+
+ /// Copy constructor.
+ TAO_Queued_Data (const TAO_Queued_Data &qd);
+
+ /// Creation of a node in the queue.
+ static TAO_Queued_Data* make_queued_data (
+ ACE_Allocator *message_buffer_alloc = 0,
+ ACE_Allocator *input_cdr_alloc = 0,
+ ACE_Data_Block *db = 0);
+
+ /// Deletion of a node from the queue.
+ static void release (TAO_Queued_Data *qd);
+
+ /// Duplicate ourselves. This creates a copy of ourselves on the
+ /// heap and returns a pointer to the duplicated node.
+ static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd);
+
+ /// Consolidate this fragments chained message blocks into one.
+ /// @return -1 if consolidation failed, eg out or memory, otherwise 0
+ int consolidate (void);
+
+public:
+
+ /// The message block that contains the message.
+ ACE_Message_Block *msg_block_;
+
+ /*!
+ @name Missing Data details
+
+ The \a missing_data_ member contains the number of bytes of
+ data missing from \a msg_block_.
+ */
+ //@{
+ /*! Data missing in the above message that hasn't been read or processed yet,
+ the value TAO_MISSING_DATA_UNDEFINED indicates it hasn't been processed yet,
+ otherwise greater or equal zero. */
+ size_t missing_data_;
+ //@}
+
+ /// Many protocols like GIOP have a major and minor version
+ /// information that would be needed to read and decipher the
+ /// message.
+ CORBA::Octet major_version_;
+ CORBA::Octet minor_version_;
+
+ /// The byte order of the message that is stored in the node.
+ CORBA::Octet byte_order_;
+
+ /// Some messages can be fragmented by the protocol (this is an ORB
+ /// level fragmentation on top of the TCP/IP fragmentation. This
+ /// member indicates whether the message that we have recd. and
+ /// queue already has more fragments that is missing..
+ CORBA::Octet more_fragments_;
+
+ /// The message type of the message
+ TAO_Pluggable_Message_Type msg_type_;
+
+ /// Pounter to the next element in the queue.
+ TAO_Queued_Data *next_;
+
+private:
+ /// Replace the datablock with a one allocated on the heap or
+ /// allocator
+ static void replace_data_block (ACE_Message_Block &mb);
+
+private:
+
+ /// The allocator used to allocate this class.
+ ACE_Allocator *allocator_;
+};
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+# include "tao/Queued_Data.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+#endif /*TAO_QUEUED_DATA_H*/
diff --git a/TAO/tao/Queued_Data.inl b/TAO/tao/Queued_Data.inl
new file mode 100644
index 00000000000..67b3cd89f4c
--- /dev/null
+++ b/TAO/tao/Queued_Data.inl
@@ -0,0 +1,33 @@
+// -*- C++ -*-
+//
+//$Id$
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+/*static*/
+ACE_INLINE void
+TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb)
+{
+ size_t const newsize =
+ ACE_CDR::total_length (&mb, 0) + ACE_CDR::MAX_ALIGNMENT;
+
+ ACE_Data_Block *db =
+ mb.data_block ()->clone_nocopy ();
+
+ if (db->size (newsize) == -1)
+ return;
+
+ ACE_Message_Block tmp (db);
+ ACE_CDR::mb_align (&tmp);
+
+ tmp.copy (mb.rd_ptr (), mb.length());
+ mb.data_block (tmp.data_block ()->duplicate ());
+
+ mb.rd_ptr (tmp.rd_ptr ());
+ mb.wr_ptr (tmp.wr_ptr ());
+
+ // Remove the DONT_DELETE flags from mb
+ mb.clr_self_flags (ACE_Message_Block::DONT_DELETE);
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/tao.mpc b/TAO/tao/tao.mpc
index adb3f9f0f4b..673fcec795e 100644
--- a/TAO/tao/tao.mpc
+++ b/TAO/tao/tao.mpc
@@ -176,6 +176,7 @@ project(TAO) : acelib, install, tao_output, taodefaults, pidl, extra_core, tao_v
Protocol_Factory.cpp
Protocols_Hooks.cpp
Network_Priority_Protocols_Hooks.cpp
+ Queued_Data.cpp
Queued_Message.cpp
Reactive_Connect_Strategy.cpp
Reactive_Flushing_Strategy.cpp
@@ -468,6 +469,7 @@ project(TAO) : acelib, install, tao_output, taodefaults, pidl, extra_core, tao_v
Network_Priority_Protocols_Hooks.h
Pseudo_VarOut_T.h
Queued_Message.h
+ Queued_Data.h
Range_Checking_T.h
Reactive_Connect_Strategy.h
Reactive_Flushing_Strategy.h