diff options
author | bala <balanatarajan@users.noreply.github.com> | 2002-07-14 15:38:38 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2002-07-14 15:38:38 +0000 |
commit | de89a6a92a1ce4bbbf79be4078a86707434bcf50 (patch) | |
tree | 21f62ce4f23a52dc8c7c7ae1ae8687c79c0003f3 | |
parent | a1322e0c75ff38401e95e481d90d46f449d5b1c2 (diff) | |
download | ATCD-de89a6a92a1ce4bbbf79be4078a86707434bcf50.tar.gz |
ChangeLogTag:Sun Jul 14 10:49:33 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | TAO/tao/Asynch_Queued_Message.cpp | 90 | ||||
-rw-r--r-- | TAO/tao/Asynch_Queued_Message.h | 23 | ||||
-rw-r--r-- | TAO/tao/ChangeLog | 15 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.h | 34 | ||||
-rw-r--r-- | TAO/tao/Synch_Queued_Message.cpp | 68 | ||||
-rw-r--r-- | TAO/tao/Synch_Queued_Message.h | 4 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 33 |
9 files changed, 260 insertions, 15 deletions
diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp index 3b87bbaf445..ebd833677b8 100644 --- a/TAO/tao/Asynch_Queued_Message.cpp +++ b/TAO/tao/Asynch_Queued_Message.cpp @@ -1,5 +1,6 @@ #include "Asynch_Queued_Message.h" - +#include "debug.h" +#include "ace/Malloc_T.h" #include "ace/Log_Msg.h" @@ -30,10 +31,22 @@ TAO_Asynch_Queued_Message:: } } +TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf, + size_t size, + ACE_Allocator *alloc) + : TAO_Queued_Message (alloc) + , size_ (size) + , offset_ (0) + , buffer_ (buf) +{ +} + TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void) { // @@ Use a pool for these guys! delete[] this->buffer_; + this->is_heap_created_ = 0; + this->allocator_ = 0; } size_t @@ -80,9 +93,80 @@ TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count) this->state_changed (TAO_LF_Event::LFS_SUCCESS); } + +TAO_Queued_Message * +TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc) +{ + char *buf = 0; + + // @@todo: Need to use a memory pool. But certain things need to + // change a bit in this class for that. Till then. + + // Just allocate and copy data that needs to be sent, no point + // copying the whole buffer. + size_t sz = this->size_ - this->offset_; + + ACE_NEW_RETURN (buf, + char[sz], + 0); + + ACE_OS::memcpy (buf, + this->buffer_ + this->offset_, + sz); + + TAO_Asynch_Queued_Message *qm = 0; + + if (alloc) + { + ACE_NEW_MALLOC_RETURN (qm, + ACE_static_cast (TAO_Asynch_Queued_Message *, + alloc->malloc (sizeof (TAO_Asynch_Queued_Message))), + TAO_Asynch_Queued_Message (buf, + sz, + alloc), + 0); + } + else + { + // No allocator, so use the common heap! + if (TAO_debug_level == 4) + { + // This debug is for testing purposes! + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Asynch_Queued_Message::clone\n", + "Using global pool for allocation \n")); + } + + ACE_NEW_RETURN (qm, + TAO_Asynch_Queued_Message (buf, + sz), + 0); + } + + // Set the flag to indicate that <qm> is created on the heap. + if (qm) + qm->is_heap_created_ = 1; + + return qm; +} + void TAO_Asynch_Queued_Message::destroy (void) { - // @@ Maybe it comes from a pool, we should do something about it. - delete this; + if (this->is_heap_created_) + { + // If we have an allocator release the memory to the allocator + // pool. + if (this->allocator_) + { + ACE_DES_FREE (this, + this->allocator_->free, + TAO_Asynch_Queued_Message); + + } + else // global release.. + { + delete this; + } + } } diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h index a2708691abd..4f8ba0a1d10 100644 --- a/TAO/tao/Asynch_Queued_Message.h +++ b/TAO/tao/Asynch_Queued_Message.h @@ -34,6 +34,8 @@ public: /** * @param contents The message block chain that must be sent. * + * @param alloc Allocator used for creating <this> object. + * * @todo I'm almost sure this class will require a callback * interface for AMIs sent with SYNC_NONE policy. Those guys * need to hear when the connection timeouts or closes, but @@ -42,6 +44,8 @@ public: TAO_Asynch_Queued_Message (const ACE_Message_Block *contents, ACE_Allocator *alloc = 0); + + /// Destructor virtual ~TAO_Asynch_Queued_Message (void); @@ -52,9 +56,28 @@ public: virtual int all_data_sent (void) const; virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const; virtual void bytes_transferred (size_t &byte_count); + /// @@NOTE: No reason to belive why this would be called. But have + /// it here for the sake of uniformity. + virtual TAO_Queued_Message *clone (ACE_Allocator *alloc); virtual void destroy (void); //@} +protected: + /// Constructor + /** + * @param buf The buffer that needs to be sent on the wire. The + * buffer will be owned by this class. The buffer will be + * deleted when the destructor is called and hence the + * buffer should always come off the heap! + * + * @param size The size of the buffer <buf> that is being handed + * over. + * + * @param alloc Allocator used for creating <this> object. + */ + TAO_Asynch_Queued_Message (char *buf, + size_t size, + ACE_Allocator *alloc = 0); private: /// The number of bytes in the buffer size_t size_; diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog index fa198b7dc0b..76a64b84e74 100644 --- a/TAO/tao/ChangeLog +++ b/TAO/tao/ChangeLog @@ -1,3 +1,18 @@ +Sun Jul 14 10:49:33 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Queued_Message.h: + * tao/Queued_Message.cpp: Added an argument in the constructor to + pass an allocator. Added a pure virtual method, clone () to the + interface. + + * tao/Synch_Queued_Message.h: + * tao/Synch_Queued_Message.cpp: + * tao/Asynch_Queued_Message.h: + * tao/Asynch_Queued_Message.cpp: Implemented the method clone (). + + * tao/Incoming_Mesage_Queue.cpp: Fixed a typo in a print + statement. + Fri Jul 12 14:10:14 2002 Balachandran Natarajan <bala@cs.wustl.edu> * tao/Transport.h: diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 0331c30d572..ef589e39cb8 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -194,7 +194,7 @@ TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc) { // This debug is for testing purposes! ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Queued_Data[%d]::get_queued_data\n", + "TAO (%P|%t) - Queued_Data::get_queued_data\n", "Using global pool for allocation \n")); } diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index 68d308b2eeb..dfb957674f4 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -9,8 +9,10 @@ ACE_RCSID(tao, Queued_Message, "$Id$") -TAO_Queued_Message::TAO_Queued_Message (void) - : next_ (0) +TAO_Queued_Message::TAO_Queued_Message (ACE_Allocator *alloc) + : allocator_ (alloc) + , is_heap_created_ (0) + , next_ (0) , prev_ (0) { } diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index a49bac11b5c..47a98735a4d 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -23,7 +23,7 @@ #include "LF_Event.h" class ACE_Message_Block; - +class ACE_Allocator; /** * @class TAO_Queued_Message * @@ -66,7 +66,7 @@ class TAO_Export TAO_Queued_Message : public TAO_LF_Event { public: /// Constructor - TAO_Queued_Message (void); + TAO_Queued_Message (ACE_Allocator *alloc = 0); /// Destructor virtual ~TAO_Queued_Message (void); @@ -143,7 +143,9 @@ public: * method should update this counter * @param iov The io vector */ - virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const = 0; + virtual void fill_iov (int iovcnt_max, + int &iovcnt, + iovec iov[]) const = 0; /// Update the internal state, data has been sent. /** @@ -161,6 +163,18 @@ public: */ virtual void bytes_transferred (size_t &byte_count) = 0; + /// Clone this element + /* + * Clone the element and return a pointer to the cloned element on + * the heap. + * + * @param allocator Use the allocator for creating the new element + * on the heap. Remember, that the allocator will + * not be used allocate the data contained in this + * element. + */ + virtual TAO_Queued_Message *clone (ACE_Allocator *allocator) = 0; + /// Reclaim resources /** * Reliable messages are allocated from the stack, thus they do not @@ -171,6 +185,20 @@ public: virtual void destroy (void) = 0; //@} +protected: + /* + * Allocator that was used to create <this> object on the heap. If the + * allocator is null then <this> is on stack. + */ + ACE_Allocator *allocator_; + + /* + * A flag that acts as a boolean to indicate whether <this> is on + * stack or heap. A non-zero value indicates that <this> was created + * on heap. + */ + int is_heap_created_; + private: /// Implement an intrusive double-linked list for the message queue TAO_Queued_Message *next_; diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp index 75aa7a8c7d5..fe6007fc1de 100644 --- a/TAO/tao/Synch_Queued_Message.cpp +++ b/TAO/tao/Synch_Queued_Message.cpp @@ -1,4 +1,6 @@ #include "Synch_Queued_Message.h" +#include "debug.h" +#include "ace/Malloc_T.h" #include "ace/Log_Msg.h" @@ -18,6 +20,8 @@ TAO_Synch_Queued_Message:: TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void) { + this->is_heap_created_ = 0; + this->allocator_ = 0; } const ACE_Message_Block * @@ -94,7 +98,71 @@ TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count) this->state_changed (TAO_LF_Event::LFS_SUCCESS); } +TAO_Queued_Message * +TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc) +{ + TAO_Synch_Queued_Message *qm = 0; + + // Clone the message block. + // NOTE: We wantedly do the cloning from <current_block_> instead of + // starting from <contents_> since we dont want to clone blocks that + // have already been sent on the wire. Waste of memory and + // associated copying. + ACE_Message_Block *mb = + this->current_block_->clone (); + + if (alloc) + { + ACE_NEW_MALLOC_RETURN (qm, + ACE_static_cast (TAO_Synch_Queued_Message *, + alloc->malloc (sizeof (TAO_Synch_Queued_Message))), + TAO_Synch_Queued_Message (mb, + alloc), + 0); + } + else + { + // No allocator, so use the common heap! + if (TAO_debug_level == 4) + { + // This debug is for testing purposes! + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Synch_Queued_Message::clone\n", + "Using global pool for allocation \n")); + } + + ACE_NEW_RETURN (qm, + TAO_Synch_Queued_Message (mb), + 0); + } + + // Set the flag to indicate that <qm> is created on the heap. + if (qm) + qm->is_heap_created_ = 1; + + return qm; +} + void TAO_Synch_Queued_Message::destroy (void) { + if (this->is_heap_created_) + { + ACE_Message_Block::release (this->contents_); + this->current_block_ = 0; + + // If we have an allocator release the memory to the allocator + // pool. + if (this->allocator_) + { + ACE_DES_FREE (this, + this->allocator_->free, + TAO_Synch_Queued_Message); + + } + else // global release.. + { + delete this; + } + } } diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h index b9f4c315971..a5644b643e5 100644 --- a/TAO/tao/Synch_Queued_Message.h +++ b/TAO/tao/Synch_Queued_Message.h @@ -51,7 +51,8 @@ public: * @param alloc The allocator that is used to allocate objects of * this type. */ - TAO_Synch_Queued_Message (const ACE_Message_Block *contents; + TAO_Synch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc = 0); @@ -67,6 +68,7 @@ public: virtual int all_data_sent (void) const; virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const; virtual void bytes_transferred (size_t &byte_count); + virtual TAO_Queued_Message *clone (ACE_Allocator *alloc); virtual void destroy (void); //@} diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index eb9e8ea890a..84185cb4599 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -509,6 +509,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // the message block. TAO_Synch_Queued_Message synch_message (mb); + synch_message.push_back (this->head_, this->tail_); + int n = this->send_synch_message_helper_i (synch_message, max_wait_time); @@ -597,6 +599,9 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, // Dont clone now.. We could be sent in one shot! TAO_Synch_Queued_Message synch_message (mb); + synch_message.push_back (this->head_, + this->tail_); + int n = this->send_synch_message_helper_i (synch_message, max_wait_time); @@ -608,22 +613,40 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, ACE_ASSERT (n == 0); // Till this point we shouldnt have any copying and that is the - // point anyway. - // Now, remove the node from the list + // point anyway. Now, remove the node from the list + synch_message.remove_from_list (this->head_, + this->tail_); - // Clone the node + ACE_Message_Block *tmp_mb = + ACE_const_cast (ACE_Message_Block *, + mb); + // Reset the message block allocators to allocate memory from the + // global pool. + tmp_mb->reset_allocators (this->orb_core_->input_cdr_buffer_allocator (), + this->orb_core_->input_cdr_dblock_allocator (), + this->orb_core_->input_cdr_msgblock_allocator ()); + // Clone the node that we have. + TAO_Queued_Message *msg = + synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); + // Stick it in the queue + msg->push_back (this->head_, + this->tail_); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + (void) flushing_strategy->schedule_output (this); + + return 1; } int TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message, ACE_Time_Value * /*max_wait_time*/) { - synch_message.push_back (this->head_, this->tail_); - // @@todo: Need to send timeouts for writing.. int n = this->drain_queue_i (); if (n == -1) |