summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-07-14 15:38:38 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-07-14 15:38:38 +0000
commitde89a6a92a1ce4bbbf79be4078a86707434bcf50 (patch)
tree21f62ce4f23a52dc8c7c7ae1ae8687c79c0003f3
parenta1322e0c75ff38401e95e481d90d46f449d5b1c2 (diff)
downloadATCD-de89a6a92a1ce4bbbf79be4078a86707434bcf50.tar.gz
ChangeLogTag:Sun Jul 14 10:49:33 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/tao/Asynch_Queued_Message.cpp90
-rw-r--r--TAO/tao/Asynch_Queued_Message.h23
-rw-r--r--TAO/tao/ChangeLog15
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp2
-rw-r--r--TAO/tao/Queued_Message.cpp6
-rw-r--r--TAO/tao/Queued_Message.h34
-rw-r--r--TAO/tao/Synch_Queued_Message.cpp68
-rw-r--r--TAO/tao/Synch_Queued_Message.h4
-rw-r--r--TAO/tao/Transport.cpp33
9 files changed, 260 insertions, 15 deletions
diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp
index 3b87bbaf445..ebd833677b8 100644
--- a/TAO/tao/Asynch_Queued_Message.cpp
+++ b/TAO/tao/Asynch_Queued_Message.cpp
@@ -1,5 +1,6 @@
#include "Asynch_Queued_Message.h"
-
+#include "debug.h"
+#include "ace/Malloc_T.h"
#include "ace/Log_Msg.h"
@@ -30,10 +31,22 @@ TAO_Asynch_Queued_Message::
}
}
+TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
+ size_t size,
+ ACE_Allocator *alloc)
+ : TAO_Queued_Message (alloc)
+ , size_ (size)
+ , offset_ (0)
+ , buffer_ (buf)
+{
+}
+
TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void)
{
// @@ Use a pool for these guys!
delete[] this->buffer_;
+ this->is_heap_created_ = 0;
+ this->allocator_ = 0;
}
size_t
@@ -80,9 +93,80 @@ TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count)
this->state_changed (TAO_LF_Event::LFS_SUCCESS);
}
+
+TAO_Queued_Message *
+TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
+{
+ char *buf = 0;
+
+ // @@todo: Need to use a memory pool. But certain things need to
+ // change a bit in this class for that. Till then.
+
+ // Just allocate and copy data that needs to be sent, no point
+ // copying the whole buffer.
+ size_t sz = this->size_ - this->offset_;
+
+ ACE_NEW_RETURN (buf,
+ char[sz],
+ 0);
+
+ ACE_OS::memcpy (buf,
+ this->buffer_ + this->offset_,
+ sz);
+
+ TAO_Asynch_Queued_Message *qm = 0;
+
+ if (alloc)
+ {
+ ACE_NEW_MALLOC_RETURN (qm,
+ ACE_static_cast (TAO_Asynch_Queued_Message *,
+ alloc->malloc (sizeof (TAO_Asynch_Queued_Message))),
+ TAO_Asynch_Queued_Message (buf,
+ sz,
+ alloc),
+ 0);
+ }
+ else
+ {
+ // No allocator, so use the common heap!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Asynch_Queued_Message::clone\n",
+ "Using global pool for allocation \n"));
+ }
+
+ ACE_NEW_RETURN (qm,
+ TAO_Asynch_Queued_Message (buf,
+ sz),
+ 0);
+ }
+
+ // Set the flag to indicate that <qm> is created on the heap.
+ if (qm)
+ qm->is_heap_created_ = 1;
+
+ return qm;
+}
+
void
TAO_Asynch_Queued_Message::destroy (void)
{
- // @@ Maybe it comes from a pool, we should do something about it.
- delete this;
+ if (this->is_heap_created_)
+ {
+ // If we have an allocator release the memory to the allocator
+ // pool.
+ if (this->allocator_)
+ {
+ ACE_DES_FREE (this,
+ this->allocator_->free,
+ TAO_Asynch_Queued_Message);
+
+ }
+ else // global release..
+ {
+ delete this;
+ }
+ }
}
diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h
index a2708691abd..4f8ba0a1d10 100644
--- a/TAO/tao/Asynch_Queued_Message.h
+++ b/TAO/tao/Asynch_Queued_Message.h
@@ -34,6 +34,8 @@ public:
/**
* @param contents The message block chain that must be sent.
*
+ * @param alloc Allocator used for creating <this> object.
+ *
* @todo I'm almost sure this class will require a callback
* interface for AMIs sent with SYNC_NONE policy. Those guys
* need to hear when the connection timeouts or closes, but
@@ -42,6 +44,8 @@ public:
TAO_Asynch_Queued_Message (const ACE_Message_Block *contents,
ACE_Allocator *alloc = 0);
+
+
/// Destructor
virtual ~TAO_Asynch_Queued_Message (void);
@@ -52,9 +56,28 @@ public:
virtual int all_data_sent (void) const;
virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const;
virtual void bytes_transferred (size_t &byte_count);
+ /// @@NOTE: No reason to belive why this would be called. But have
+ /// it here for the sake of uniformity.
+ virtual TAO_Queued_Message *clone (ACE_Allocator *alloc);
virtual void destroy (void);
//@}
+protected:
+ /// Constructor
+ /**
+ * @param buf The buffer that needs to be sent on the wire. The
+ * buffer will be owned by this class. The buffer will be
+ * deleted when the destructor is called and hence the
+ * buffer should always come off the heap!
+ *
+ * @param size The size of the buffer <buf> that is being handed
+ * over.
+ *
+ * @param alloc Allocator used for creating <this> object.
+ */
+ TAO_Asynch_Queued_Message (char *buf,
+ size_t size,
+ ACE_Allocator *alloc = 0);
private:
/// The number of bytes in the buffer
size_t size_;
diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog
index fa198b7dc0b..76a64b84e74 100644
--- a/TAO/tao/ChangeLog
+++ b/TAO/tao/ChangeLog
@@ -1,3 +1,18 @@
+Sun Jul 14 10:49:33 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Queued_Message.h:
+ * tao/Queued_Message.cpp: Added an argument in the constructor to
+ pass an allocator. Added a pure virtual method, clone () to the
+ interface.
+
+ * tao/Synch_Queued_Message.h:
+ * tao/Synch_Queued_Message.cpp:
+ * tao/Asynch_Queued_Message.h:
+ * tao/Asynch_Queued_Message.cpp: Implemented the method clone ().
+
+ * tao/Incoming_Mesage_Queue.cpp: Fixed a typo in a print
+ statement.
+
Fri Jul 12 14:10:14 2002 Balachandran Natarajan <bala@cs.wustl.edu>
* tao/Transport.h:
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 0331c30d572..ef589e39cb8 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -194,7 +194,7 @@ TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc)
{
// This debug is for testing purposes!
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Queued_Data[%d]::get_queued_data\n",
+ "TAO (%P|%t) - Queued_Data::get_queued_data\n",
"Using global pool for allocation \n"));
}
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
index 68d308b2eeb..dfb957674f4 100644
--- a/TAO/tao/Queued_Message.cpp
+++ b/TAO/tao/Queued_Message.cpp
@@ -9,8 +9,10 @@
ACE_RCSID(tao, Queued_Message, "$Id$")
-TAO_Queued_Message::TAO_Queued_Message (void)
- : next_ (0)
+TAO_Queued_Message::TAO_Queued_Message (ACE_Allocator *alloc)
+ : allocator_ (alloc)
+ , is_heap_created_ (0)
+ , next_ (0)
, prev_ (0)
{
}
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index a49bac11b5c..47a98735a4d 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -23,7 +23,7 @@
#include "LF_Event.h"
class ACE_Message_Block;
-
+class ACE_Allocator;
/**
* @class TAO_Queued_Message
*
@@ -66,7 +66,7 @@ class TAO_Export TAO_Queued_Message : public TAO_LF_Event
{
public:
/// Constructor
- TAO_Queued_Message (void);
+ TAO_Queued_Message (ACE_Allocator *alloc = 0);
/// Destructor
virtual ~TAO_Queued_Message (void);
@@ -143,7 +143,9 @@ public:
* method should update this counter
* @param iov The io vector
*/
- virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const = 0;
+ virtual void fill_iov (int iovcnt_max,
+ int &iovcnt,
+ iovec iov[]) const = 0;
/// Update the internal state, data has been sent.
/**
@@ -161,6 +163,18 @@ public:
*/
virtual void bytes_transferred (size_t &byte_count) = 0;
+ /// Clone this element
+ /*
+ * Clone the element and return a pointer to the cloned element on
+ * the heap.
+ *
+ * @param allocator Use the allocator for creating the new element
+ * on the heap. Remember, that the allocator will
+ * not be used allocate the data contained in this
+ * element.
+ */
+ virtual TAO_Queued_Message *clone (ACE_Allocator *allocator) = 0;
+
/// Reclaim resources
/**
* Reliable messages are allocated from the stack, thus they do not
@@ -171,6 +185,20 @@ public:
virtual void destroy (void) = 0;
//@}
+protected:
+ /*
+ * Allocator that was used to create <this> object on the heap. If the
+ * allocator is null then <this> is on stack.
+ */
+ ACE_Allocator *allocator_;
+
+ /*
+ * A flag that acts as a boolean to indicate whether <this> is on
+ * stack or heap. A non-zero value indicates that <this> was created
+ * on heap.
+ */
+ int is_heap_created_;
+
private:
/// Implement an intrusive double-linked list for the message queue
TAO_Queued_Message *next_;
diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp
index 75aa7a8c7d5..fe6007fc1de 100644
--- a/TAO/tao/Synch_Queued_Message.cpp
+++ b/TAO/tao/Synch_Queued_Message.cpp
@@ -1,4 +1,6 @@
#include "Synch_Queued_Message.h"
+#include "debug.h"
+#include "ace/Malloc_T.h"
#include "ace/Log_Msg.h"
@@ -18,6 +20,8 @@ TAO_Synch_Queued_Message::
TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void)
{
+ this->is_heap_created_ = 0;
+ this->allocator_ = 0;
}
const ACE_Message_Block *
@@ -94,7 +98,71 @@ TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count)
this->state_changed (TAO_LF_Event::LFS_SUCCESS);
}
+TAO_Queued_Message *
+TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
+{
+ TAO_Synch_Queued_Message *qm = 0;
+
+ // Clone the message block.
+ // NOTE: We wantedly do the cloning from <current_block_> instead of
+ // starting from <contents_> since we dont want to clone blocks that
+ // have already been sent on the wire. Waste of memory and
+ // associated copying.
+ ACE_Message_Block *mb =
+ this->current_block_->clone ();
+
+ if (alloc)
+ {
+ ACE_NEW_MALLOC_RETURN (qm,
+ ACE_static_cast (TAO_Synch_Queued_Message *,
+ alloc->malloc (sizeof (TAO_Synch_Queued_Message))),
+ TAO_Synch_Queued_Message (mb,
+ alloc),
+ 0);
+ }
+ else
+ {
+ // No allocator, so use the common heap!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Synch_Queued_Message::clone\n",
+ "Using global pool for allocation \n"));
+ }
+
+ ACE_NEW_RETURN (qm,
+ TAO_Synch_Queued_Message (mb),
+ 0);
+ }
+
+ // Set the flag to indicate that <qm> is created on the heap.
+ if (qm)
+ qm->is_heap_created_ = 1;
+
+ return qm;
+}
+
void
TAO_Synch_Queued_Message::destroy (void)
{
+ if (this->is_heap_created_)
+ {
+ ACE_Message_Block::release (this->contents_);
+ this->current_block_ = 0;
+
+ // If we have an allocator release the memory to the allocator
+ // pool.
+ if (this->allocator_)
+ {
+ ACE_DES_FREE (this,
+ this->allocator_->free,
+ TAO_Synch_Queued_Message);
+
+ }
+ else // global release..
+ {
+ delete this;
+ }
+ }
}
diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h
index b9f4c315971..a5644b643e5 100644
--- a/TAO/tao/Synch_Queued_Message.h
+++ b/TAO/tao/Synch_Queued_Message.h
@@ -51,7 +51,8 @@ public:
* @param alloc The allocator that is used to allocate objects of
* this type.
*/
- TAO_Synch_Queued_Message (const ACE_Message_Block *contents;
+ TAO_Synch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc = 0);
@@ -67,6 +68,7 @@ public:
virtual int all_data_sent (void) const;
virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const;
virtual void bytes_transferred (size_t &byte_count);
+ virtual TAO_Queued_Message *clone (ACE_Allocator *alloc);
virtual void destroy (void);
//@}
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index eb9e8ea890a..84185cb4599 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -509,6 +509,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// the message block.
TAO_Synch_Queued_Message synch_message (mb);
+ synch_message.push_back (this->head_, this->tail_);
+
int n =
this->send_synch_message_helper_i (synch_message,
max_wait_time);
@@ -597,6 +599,9 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
// Dont clone now.. We could be sent in one shot!
TAO_Synch_Queued_Message synch_message (mb);
+ synch_message.push_back (this->head_,
+ this->tail_);
+
int n =
this->send_synch_message_helper_i (synch_message,
max_wait_time);
@@ -608,22 +613,40 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
ACE_ASSERT (n == 0);
// Till this point we shouldnt have any copying and that is the
- // point anyway.
- // Now, remove the node from the list
+ // point anyway. Now, remove the node from the list
+ synch_message.remove_from_list (this->head_,
+ this->tail_);
- // Clone the node
+ ACE_Message_Block *tmp_mb =
+ ACE_const_cast (ACE_Message_Block *,
+ mb);
+ // Reset the message block allocators to allocate memory from the
+ // global pool.
+ tmp_mb->reset_allocators (this->orb_core_->input_cdr_buffer_allocator (),
+ this->orb_core_->input_cdr_dblock_allocator (),
+ this->orb_core_->input_cdr_msgblock_allocator ());
+ // Clone the node that we have.
+ TAO_Queued_Message *msg =
+ synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
+ // Stick it in the queue
+ msg->push_back (this->head_,
+ this->tail_);
+
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+ (void) flushing_strategy->schedule_output (this);
+
+ return 1;
}
int
TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
ACE_Time_Value * /*max_wait_time*/)
{
- synch_message.push_back (this->head_, this->tail_);
-
// @@todo: Need to send timeouts for writing..
int n = this->drain_queue_i ();
if (n == -1)