summaryrefslogtreecommitdiff
path: root/TAO/tao/Incoming_Message_Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Incoming_Message_Queue.cpp')
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp379
1 files changed, 379 insertions, 0 deletions
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
new file mode 100644
index 00000000000..155b87b4c82
--- /dev/null
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -0,0 +1,379 @@
+#include "tao/Incoming_Message_Queue.h"
+#include "tao/debug.h"
+
+#include "ace/Log_Msg.h"
+#include "ace/Malloc_Base.h"
+
+
+#if !defined (__ACE_INLINE__)
+# include "tao/Incoming_Message_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+
+ACE_RCSID (tao,
+ Incoming_Message_Queue,
+ "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
+ : last_added_ (0),
+ size_ (0),
+ orb_core_ (orb_core)
+{
+}
+
+TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void)
+{
+ const CORBA::ULong sz = this->size_;
+
+ // Delete all the nodes left behind
+ for (CORBA::ULong i = 0;
+ i < sz;
+ ++i)
+ {
+ TAO_Queued_Data *qd = this->dequeue_head ();
+ TAO_Queued_Data::release (qd);
+ }
+}
+
+
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::dequeue_head (void)
+{
+ if (this->size_ == 0)
+ return 0;
+
+ // Get the node on the head of the queue...
+ TAO_Queued_Data * const head = this->last_added_->next_;
+
+ // Reset the head node..
+ this->last_added_->next_ = head->next_;
+
+ // Decrease the size and reset last_added_ if empty
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
+
+ return head;
+}
+
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::dequeue_tail (void)
+{
+ // This is a bit painful stuff...
+ if (this->size_ == 0)
+ return 0;
+
+ // Get the node on the head of the queue...
+ TAO_Queued_Data *head =
+ this->last_added_->next_;
+
+ while (head->next_ != this->last_added_)
+ {
+ head = head->next_;
+ }
+
+ // Put the head in tmp.
+ head->next_ = this->last_added_->next_;
+
+ TAO_Queued_Data *ret_qd = this->last_added_;
+
+ this->last_added_ = head;
+
+ // Decrease the size
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
+
+ return ret_qd;
+}
+
+int
+TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
+{
+ if (this->size_ == 0)
+ {
+ this->last_added_ = nd;
+ this->last_added_->next_ = this->last_added_;
+ }
+ else
+ {
+ nd->next_ = this->last_added_->next_;
+ this->last_added_->next_ = nd;
+ this->last_added_ = nd;
+ }
+
+ ++ this->size_;
+ return 0;
+}
+
+
+/************************************************************************/
+// Methods for TAO_Queued_Data
+/************************************************************************/
+
+/*!
+ * @brief Allocate and return a new empty message block of size \a span_size
+ * mimicking parameters of \a mb.
+ *
+ * This function allocates a new aligned message block using the same
+ * allocators and flags as found in \a mb. The size of the new message
+ * block is at least \a span_size; the size may be adjusted up in order
+ * to accomodate alignment requirements and still fit \a span_size bytes
+ * into the aligned buffer.
+ *
+ * @param mb message block whose parameters should be mimicked
+ * @param span_size size of the new message block (will be adjusted for proper
+ * alignment)
+ * @return an aligned message block with rd_ptr sitting at correct
+ * alignment spot, 0 on failure
+ */
+static ACE_Message_Block*
+clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
+{
+ // Calculate the required size of the cloned block with alignment
+ size_t const aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
+
+ // Get the allocators
+ ACE_Allocator *data_allocator = 0;
+ ACE_Allocator *data_block_allocator = 0;
+ ACE_Allocator *message_block_allocator = 0;
+ mb->access_allocators (data_allocator,
+ data_block_allocator,
+ message_block_allocator);
+
+ // Create a new Message Block
+ ACE_Message_Block *nb = 0;
+ ACE_NEW_MALLOC_RETURN (nb,
+ static_cast<ACE_Message_Block*> (
+ message_block_allocator->malloc (
+ sizeof (ACE_Message_Block))),
+ ACE_Message_Block(aligned_size,
+ mb->msg_type(),
+ mb->cont(),
+ 0, //we want the data block created
+ data_allocator,
+ mb->locking_strategy(),
+ mb->msg_priority(),
+ mb->msg_execution_time (),
+ mb->msg_deadline_time (),
+ data_block_allocator,
+ message_block_allocator),
+ 0);
+
+ ACE_CDR::mb_align (nb);
+
+ // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
+ // we just dynamically allocated the two things.
+ nb->set_flags (mb->flags());
+ nb->clr_flags (ACE_Message_Block::DONT_DELETE);
+
+ return nb;
+}
+
+TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
+ : msg_block_ (0),
+ missing_data_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ byte_order_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
+{
+}
+
+TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
+ ACE_Allocator *alloc)
+ : msg_block_ (mb),
+ missing_data_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ byte_order_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
+{
+}
+
+TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
+ : msg_block_ (qd.msg_block_->duplicate ()),
+ missing_data_ (qd.missing_data_),
+ major_version_ (qd.major_version_),
+ minor_version_ (qd.minor_version_),
+ byte_order_ (qd.byte_order_),
+ more_fragments_ (qd.more_fragments_),
+ msg_type_ (qd.msg_type_),
+ next_ (0),
+ allocator_ (qd.allocator_)
+{
+}
+
+/*static*/
+TAO_Queued_Data *
+TAO_Queued_Data::make_queued_data (ACE_Allocator *message_buffer_alloc,
+ ACE_Allocator *input_cdr_alloc,
+ ACE_Data_Block *db)
+{
+ // Get a node for the queue..
+ TAO_Queued_Data *qd = 0;
+
+ if (message_buffer_alloc)
+ {
+ ACE_NEW_MALLOC_RETURN (qd,
+ static_cast<TAO_Queued_Data *> (
+ message_buffer_alloc->malloc (sizeof (TAO_Queued_Data))),
+ TAO_Queued_Data (message_buffer_alloc),
+ 0);
+
+ }
+ else
+ {
+ // No allocator, so use the global pool!
+ ACE_NEW_RETURN (qd,
+ TAO_Queued_Data,
+ 0);
+ }
+
+ // Providing an ACE_Data_Block indicates that the caller wants
+ // an aligned ACE_Message_Block added to the TAO_Queued_Data.
+ if (db != 0)
+ {
+ // If this allocation fails, the TAO_Queued_Data will be leaked.
+ if (input_cdr_alloc == 0)
+ ACE_NEW_RETURN (qd->msg_block_,
+ ACE_Message_Block (db,
+ 0,
+ input_cdr_alloc),
+ 0);
+ else
+ ACE_NEW_MALLOC_RETURN (qd->msg_block_,
+ static_cast<ACE_Message_Block*> (
+ input_cdr_alloc->malloc (sizeof (ACE_Message_Block))),
+ ACE_Message_Block (db,
+ 0,
+ input_cdr_alloc),
+ 0);
+
+ ACE_CDR::mb_align (qd->msg_block_);
+ }
+
+ return qd;
+}
+
+/*static*/
+void
+TAO_Queued_Data::release (TAO_Queued_Data *qd)
+{
+ //// TODO
+ ACE_Message_Block::release (qd->msg_block_);
+
+ if (qd->allocator_)
+ {
+ ACE_DES_FREE (qd,
+ qd->allocator_->free,
+ TAO_Queued_Data);
+
+ return;
+ }
+
+ // @@todo: Need to be removed at some point of time!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Queued_Data[%d]::release\n",
+ "Using global pool for releasing \n"));
+ }
+ delete qd;
+
+}
+
+
+TAO_Queued_Data *
+TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
+{
+ // Check to see if the underlying block is on the stack. If not it
+ // is fine. If the datablock is on stack, try to make a copy of that
+ // before doing a duplicate.
+ // @@ todo: Theoretically this should be within the Message Block,
+ // but we dont have much scope to do this in that mess. Probably in
+ // the next stage of MB rewrite we should be okay
+ ACE_Message_Block::Message_Flags fl =
+ sqd.msg_block_->self_flags ();
+
+ if (ACE_BIT_ENABLED (fl,
+ ACE_Message_Block::DONT_DELETE))
+ (void) TAO_Queued_Data::replace_data_block (*sqd.msg_block_);
+
+
+ TAO_Queued_Data *qd = 0;
+
+ if (sqd.allocator_)
+ {
+ ACE_NEW_MALLOC_RETURN (qd,
+ static_cast<TAO_Queued_Data *> (
+ sqd.allocator_->malloc (sizeof (TAO_Queued_Data))),
+ TAO_Queued_Data (sqd),
+ 0);
+
+ return qd;
+ }
+
+ // No allocator, so use the global pool!
+ // @@ TODO: We should be removing this at some point of time!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Queued_Data[%d]::duplicate\n",
+ "Using global pool for allocation \n"));
+ }
+
+ ACE_NEW_RETURN (qd,
+ TAO_Queued_Data (sqd),
+ 0);
+
+ return qd;
+}
+
+int
+TAO_Queued_Data::consolidate (void)
+{
+ // Is this a chain of fragments?
+ if (this->more_fragments_ && this->msg_block_->cont () != 0)
+ {
+ // Create a message block big enough to hold the entire chain
+ ACE_Message_Block *dest = clone_mb_nocopy_size (
+ this->msg_block_,
+ this->msg_block_->total_length ());
+
+ if (0 == dest)
+ {
+ // out of memory
+ return -1;
+ }
+ // Memory allocation succeeded, the new message block can hold the consolidated
+ // message. The following code just copies all the data into this new message block.
+ // No further memory allocation will take place.
+
+ // Reset the cont() parameter. We have cloned the message
+ // block but not the chain as we will no longer have chain.
+ dest->cont (0);
+
+ // Use ACE_CDR to consolidate the chain for us
+ ACE_CDR::consolidate (dest, this->msg_block_);
+
+ // free the original message block chain
+ this->msg_block_->release ();
+
+ // Set the message block to the new consolidated message block
+ this->msg_block_ = dest;
+ this->more_fragments_ = 0;
+ }
+
+ return 0;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL