From 268699755fb3f4d136f1e7b3b908e8d06345eb9e Mon Sep 17 00:00:00 2001 From: bala Date: Wed, 2 May 2001 21:53:44 +0000 Subject: ChangeLogTag: Wed May 2 16:52:09 2001 Balachandran Natarajan --- TAO/ChangeLogs/ChangeLog-02a | 49 +++++++++++++++++++++++ TAO/tao/GIOP_Message_Base.cpp | 86 ++++++++++++---------------------------- TAO/tao/GIOP_Message_Handler.cpp | 49 ++++++++++++++++++++++- TAO/tao/GIOP_Message_Handler.h | 9 +++-- TAO/tao/GIOP_Message_Handler.inl | 23 ----------- TAO/tao/ORB_Core.cpp | 82 ++++++++++++++++++++++++++++++++++++++ TAO/tao/ORB_Core.h | 29 ++++++++++++++ 7 files changed, 239 insertions(+), 88 deletions(-) diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index ba1a2d32d33..a463af8816b 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,52 @@ +Wed May 2 16:52:09 2001 Balachandran Natarajan + + * tao/ORB_Core.cpp: + * tao/ORB_Core.h: Added new methods that could dish out memory + always from the global pool. The motivation for a new methods + are: + (1) In olden days out CDR's were created only in the invocation + path and destroyed during the invocation path. If the ORB is + configured with TSS, it made sense to get the allocators + from the TSS. That doesn't hold true anymore as illustrated + by point (2). + + (2) We now cache CDR streams, message blocks in classes within + the connection handlers. These objects are on a + per-connection basis as opposed to per-thread basis (though + the thread may very well create the connections). We want + the connections to be reused by other threads and so we + cannot destroy the connections when the threads go + away. + + The above two reasons motivated adding new methods that dish out + memory from the global pool alone, even if the ORB has been + configured to use TSS. + + * tao/GIOP_Message_Base.cpp: Changes for bug id 871. We were + trying to use a constructor for an InputCDR stream that made a + copy from a message block. The process happened in two steps. It + first allocated memory and then copied the contents. Both of + them are bad. To make things efficient we use only the data + block in which the incoming stream has been read to create a CDR + stream. The same stream is passed onto the higher layers of the + ORB. The stream *never* gets copied. The stream that is handled + by the Reply_Dispatcher only uses this very same data block to + pass it onto the stubs. So this should give us a neat path with + no-copies to the stubs. But there is one release that can be + removed which will be done in the next cycle. + + * tao/GIOP_Message_Handler.cpp: + * tao/GIOP_Message_Handler.h: + * tao/GIOP_Message_Handler.inl: Added a new method + steal_data_block (). This gives ownership away of the existing + data_block and creates a new data_block for itself. Further the + message_blocks use the memory from the global memory pool so + that we dont have problems when the message blocks loose + ownership to other threads. + + We have one more release in the critical path that can be + removed. Once that is removed we can close #871. + Wed May 2 15:14:13 2001 Balachandran Natarajan * tao/CDR.h (TAO_InputCDR): diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 9465274c87f..3024636232b 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -23,8 +23,6 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, this, input_cdr_size), output_ (0), - cdr_buffer_alloc_ (orb_core->resource_factory ()->output_cdr_buffer_allocator ()), - cdr_dblock_alloc_ (orb_core->resource_factory ()->output_cdr_dblock_allocator ()), generator_parser_ (0) { #if defined(ACE_HAS_PURIFY) @@ -36,8 +34,8 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, TAO_OutputCDR (this->repbuf_, sizeof this->repbuf_, TAO_ENCAP_BYTE_ORDER, - this->cdr_buffer_alloc_, - this->cdr_dblock_alloc_, + orb_core->message_block_buffer_allocator (), + orb_core->message_block_dblock_allocator (), orb_core->orb_params ()->cdr_memcpy_tradeoff (), orb_core->to_iso8859 (), orb_core->to_unicode ())); @@ -46,18 +44,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void) { - // Explicitly call the destructor of the output CDR first. They need - // the allocators during destruction. delete this->output_; - - // Then call the destructor of our allocators - if (this->cdr_dblock_alloc_ != 0) - this->cdr_dblock_alloc_->remove (); - delete this->cdr_dblock_alloc_; - - if (this->cdr_buffer_alloc_ != 0) - this->cdr_buffer_alloc_->remove (); - delete this->cdr_buffer_alloc_; } @@ -315,39 +302,21 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // @@@@Is it necessary here? this->output_->reset (); - /************************************************************/ - // @@ This comment was there when we were using multiple reads. Let - // it be here till a point it doesn't make sense _ bala + // Get the read and write positions before we steal data. + size_t rd_pos = this->message_handler_.rd_pos (); + size_t wr_pos = this->message_handler_.wr_pos (); - // Take out all the information from the and reset - // it so that nested upcall on the same transport can be handled. - // - // Notice that the message_state is only modified in one thread at a - // time because the reactor does not call handle_input() for the - // same Event_Handler in two threads at the same time. - /************************************************************/ - - // Create a message block by stealing the data block - ACE_Message_Block msg_block (this->message_handler_.data_block_dup ()); - - // Move the wr_ptr () and rd_ptr in the message block. This is not - // generally required as we are not going to write anything. But - // this is *important* for checking the length of the CDR streams - msg_block.wr_ptr (this->message_handler_.wr_pos ()); - msg_block.rd_ptr (this->message_handler_.rd_pos ()); - - // Steal the input CDR from the message block - TAO_InputCDR input_cdr (&msg_block, + + // Create a input CDR stream. + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. The same is alos done in the higher layers. + TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (), + rd_pos, + wr_pos, this->message_handler_.message_state ().byte_order, orb_core); - // input_cdr.skip_bytes (TAO_GIOP_MESSAGE_HEADER_LEN); - - // Send the message state for the service layer like FT to log the - // messages - // @@@ Needed for DOORS - // orb_core->services_log_msg_rcv (this->message_state_); - // Reset the message handler to receive upcalls if any this->message_handler_.reset (0); @@ -377,23 +346,20 @@ TAO_GIOP_Message_Base::process_reply_message ( TAO_Pluggable_Reply_Params ¶ms ) { - // Create a message block by stealing the data block - ACE_Message_Block msg_block (this->message_handler_.data_block_dup ()); + // Get the read and write positions before we steal data. + size_t rd_pos = this->message_handler_.rd_pos (); + size_t wr_pos = this->message_handler_.wr_pos (); - // ACE_CDR::mb_align (&msg_block); - // Move the wr_ptr () and rd_ptr in the message block. This is not - // generally required as we are not going to write anything. But - // this is *important* for checking the length of the CDR streams - // size_t n = this->message_handler_.message_state ().message_size; - msg_block.wr_ptr (this->message_handler_.wr_pos ()); - msg_block.rd_ptr (this->message_handler_.rd_pos ()); + // Create a input CDR stream. + // NOTE: We use the same data block in which we read the message and + // we pass it on to the higher layers of the ORB. So we dont to any + // copies at all here. The same is alos done in the higher layers. + TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (), + rd_pos, + wr_pos, + this->message_handler_.message_state ().byte_order); - // Steal the input CDR from the message block - int byte_order = this->message_handler_.message_state ().byte_order; - TAO_InputCDR input_cdr (&msg_block, byte_order); - - // input_cdr.skip_bytes (TAO_GIOP_MESSAGE_HEADER_LEN); // Reset the message state. Now, we are ready for the next nested // upcall if any. @@ -623,8 +589,8 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, int result = 0; if (response_required) - { - CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code + { + CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0), CORBA::COMPLETED_MAYBE); diff --git a/TAO/tao/GIOP_Message_Handler.cpp b/TAO/tao/GIOP_Message_Handler.cpp index 2ce7bd877b5..8681bf8b749 100644 --- a/TAO/tao/GIOP_Message_Handler.cpp +++ b/TAO/tao/GIOP_Message_Handler.cpp @@ -20,11 +20,21 @@ TAO_GIOP_Message_Handler::TAO_GIOP_Message_Handler (TAO_ORB_Core * orb_core, : mesg_base_ (base), message_status_ (TAO_GIOP_WAITING_FOR_HEADER), message_size_ (input_cdr_size), - current_buffer_ (orb_core->create_input_cdr_data_block (input_cdr_size)), - supp_buffer_ (orb_core->create_input_cdr_data_block (input_cdr_size)), + current_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)), + supp_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)), message_state_ (orb_core), orb_core_ (orb_core) { + // NOTE: The message blocks here use a locked allocator which is not + // from the TSS even if there is one. We are getting the allocators + // from the global memory. We shouldn't be using the TSS stuff for + // the following reason + // (a) The connection handlers are per-connection and not + // per-thread. + // (b) The order of cleaning is important if we use allocators from + // TSS. The TSS goes away when the threads go away. But the + // connection handlers go away only when the ORB decides to shut + // it down. ACE_CDR::mb_align (&this->current_buffer_); // Calculate the effective message after alignment @@ -532,3 +542,38 @@ TAO_GIOP_Message_Handler::read_messages (TAO_Transport *transport) // Success return 1; } + + +ACE_Data_Block * +TAO_GIOP_Message_Handler::steal_data_block (void) +{ + ACE_Data_Block *db = + this->current_buffer_.data_block ()->clone_nocopy (); + + ACE_Data_Block *old_db = + this->current_buffer_.replace_data_block (db); + + ACE_CDR::mb_align (&this->current_buffer_); + + return old_db; +} + + +void +TAO_GIOP_Message_Handler::reset (int reset_flag) +{ + // Reset the contents of the message state + this->message_state_.reset (reset_flag); + + // Reset the current buffer + this->current_buffer_.reset (); + + ACE_CDR::mb_align (&this->current_buffer_); + + if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES) + { + this->supp_buffer_.reset (); + ACE_CDR::mb_align (&this->supp_buffer_); + } + +} diff --git a/TAO/tao/GIOP_Message_Handler.h b/TAO/tao/GIOP_Message_Handler.h index 07037a6f30e..9ccae8b2488 100644 --- a/TAO/tao/GIOP_Message_Handler.h +++ b/TAO/tao/GIOP_Message_Handler.h @@ -86,7 +86,7 @@ public: /// Reset the contents of the if no more requests /// need to be processed. We reset the contents of the /// to parse and process the next request. - void reset (int reset_flag); + void reset (int reset_flag = 0); /// Return the underlying message state TAO_GIOP_Message_State &message_state (void); @@ -94,8 +94,10 @@ public: /// Return the pointer to the data block within the message block ACE_Data_Block *data_block (void) const; - /// Return the pointer to the datablock by duplicating it. - ACE_Data_Block *data_block_dup (void); + /// Return the underlying data block of the . At + /// the sametime making a new data_block for itself. The read and + /// write pointer positions would be reset. + ACE_Data_Block *steal_data_block (void); /// Return the rd_ptr of the char *rd_ptr (void) const; @@ -132,6 +134,7 @@ private: /// the buffer appropriately. int read_messages (TAO_Transport *transport); + private: /// The pointer to the object that holds us diff --git a/TAO/tao/GIOP_Message_Handler.inl b/TAO/tao/GIOP_Message_Handler.inl index 561efaf6a2f..89a3b3679e6 100644 --- a/TAO/tao/GIOP_Message_Handler.inl +++ b/TAO/tao/GIOP_Message_Handler.inl @@ -13,31 +13,8 @@ TAO_GIOP_Message_Handler::data_block (void) const return this->current_buffer_.data_block (); } -ACE_INLINE ACE_Data_Block * -TAO_GIOP_Message_Handler::data_block_dup (void) -{ - return this->current_buffer_.data_block ()->duplicate (); -} -ACE_INLINE void -TAO_GIOP_Message_Handler::reset (int /*reset_flag*/) -{ - // Reset the contents of the message state - this->message_state_.reset (0); - - // Reset the current buffer - this->current_buffer_.reset (); - ACE_CDR::mb_align (&this->current_buffer_); - - if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES) - { - this->supp_buffer_.reset (); - ACE_CDR::mb_align (&this->supp_buffer_); - } - -} - ACE_INLINE char * TAO_GIOP_Message_Handler::rd_ptr (void) const { diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 1390ffe0767..30443f78a64 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -98,6 +98,8 @@ TAO_ORB_Core::TAO_ORB_Core (const char *orbid) object_ref_table_ (), orbid_ (ACE_OS::strdup (orbid ? orbid : "")), resource_factory_ (0), + message_block_dblock_allocator_ (0), + message_block_buffer_allocator_ (0), resource_factory_from_service_config_ (0), // @@ This is not needed since the default resource factory, fredk // is statically added to the service configurator. @@ -1206,6 +1208,14 @@ TAO_ORB_Core::fini (void) if (this->resource_factory_ != 0) this->resource_factory_->reclaim_reactor (this->reactor_); + if (this->message_block_dblock_allocator_) + this->message_block_dblock_allocator_->remove (); + delete this->message_block_dblock_allocator_; + + if (this-> message_block_buffer_allocator_) + this->message_block_buffer_allocator_->remove (); + delete this->message_block_buffer_allocator_; + (void) TAO_Internal::close_services (); delete this->reactor_registry_; @@ -2580,6 +2590,78 @@ TAO_ORB_Core::create_input_cdr_data_block (size_t size) lock_strategy = &this->data_block_lock_; } + return this->create_data_block_i (size, + buffer_allocator, + dblock_allocator, + lock_strategy); +} + +ACE_Data_Block * +TAO_ORB_Core::data_block_for_message_block (size_t size) +{ + ACE_Data_Block *nb = 0; + + ACE_Allocator *dblock_allocator; + ACE_Allocator *buffer_allocator; + + /// @@ We are using the input CDR configurations for this. In a + /// @@ generic sense we should be using dishing out datablocks to + /// @@ the input and output CDR folks. Will get to this when we have + /// @@ time -- NB + dblock_allocator = + this->message_block_dblock_allocator (); + buffer_allocator = + this->message_block_buffer_allocator (); + + ACE_Lock* lock_strategy = 0; + if (this->resource_factory ()->use_locked_data_blocks ()) + { + lock_strategy = &this->data_block_lock_; + } + + return this->create_data_block_i (size, + buffer_allocator, + dblock_allocator, + lock_strategy); +} + +ACE_Allocator* +TAO_ORB_Core::message_block_dblock_allocator (void) +{ + if (this->message_block_dblock_allocator_ == 0) + { + // Double checked locking + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0); + if (this->message_block_dblock_allocator_ == 0) + this->message_block_dblock_allocator_ = + this->resource_factory ()->input_cdr_dblock_allocator (); + } + return this->message_block_dblock_allocator_; +} + +ACE_Allocator* +TAO_ORB_Core::message_block_buffer_allocator (void) +{ + if (this->message_block_buffer_allocator_ == 0) + { + // Double checked locking + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0); + if (this->message_block_buffer_allocator_ == 0) + this->message_block_buffer_allocator_ = + this->resource_factory ()->input_cdr_buffer_allocator (); + } + return this->message_block_buffer_allocator_; +} + + +ACE_Data_Block * +TAO_ORB_Core::create_data_block_i (size_t size, + ACE_Allocator *buffer_allocator, + ACE_Allocator *dblock_allocator, + ACE_Lock *lock_strategy) +{ + ACE_Data_Block *nb = 0; + ACE_NEW_MALLOC_RETURN ( nb, ACE_static_cast(ACE_Data_Block*, diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index 07ffd89ea90..b1e36a5bcf4 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -413,10 +413,28 @@ public: /// for allocating the buffers used in *outgoing* CDR streams. ACE_Allocator *input_cdr_buffer_allocator (void); + /// This allocator is global, may or may not have locks. It is + /// intended for ACE_Data_Blocks used in message blocks or CDR + /// streams that have no relation with the life of threads, + /// something like used in a class on a per connection basis + ACE_Allocator *message_block_dblock_allocator (void); + + /// This allocator is global, may or may not have locks. It is + /// intended for ACE_Data_Blocks used in message blocks or CDR + /// streams that have no relation with the life of threads, + /// something like used in a class on a per connection basis + ACE_Allocator *message_block_buffer_allocator (void); + /// The Message Blocks used for input CDRs must have appropiate /// locking strategies. ACE_Data_Block *create_input_cdr_data_block (size_t size); + + /// The data blocks returned have memeory from the global pool. Will + /// not get anything from the TSS even if it is available. + ACE_Data_Block *data_block_for_message_block (size_t size); + + #if (TAO_HAS_CORBA_MESSAGING == 1) /// Accessor method for the default_policies_ @@ -869,6 +887,11 @@ protected: ACE_Allocator *input_cdr_buffer_allocator_i (TAO_ORB_Core_TSS_Resources *); //@} + /// Routine that creates a ACE_Data_Block given the lock and allocators. + ACE_Data_Block *create_data_block_i (size_t size, + ACE_Allocator *buffer_allocator, + ACE_Allocator *dblock_allocator, + ACE_Lock *lock); #if (TAO_HAS_RT_CORBA == 1) /// Obtain and cache the RT_ORB factory object reference @@ -999,6 +1022,12 @@ protected: /// Handle to the factory for resource information.. TAO_Resource_Factory *resource_factory_; + /// The allocators for the message blocks + //@{ + ACE_Allocator *message_block_dblock_allocator_; + ACE_Allocator *message_block_buffer_allocator_; + //@} + // Name of the resource factory that needs to be instantiated. // The default value is "Resource_Factory". If TAO_Strategies is // linked, the set_resource_factory will be called to set the value -- cgit v1.2.1