diff options
Diffstat (limited to 'TAO/tao')
-rw-r--r-- | TAO/tao/CDR.h | 5 | ||||
-rw-r--r-- | TAO/tao/CDR.i | 9 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connect.cpp | 10 | ||||
-rw-r--r-- | TAO/tao/Invocation.i | 2 | ||||
-rw-r--r-- | TAO/tao/ORB_Core.cpp | 109 | ||||
-rw-r--r-- | TAO/tao/ORB_Core.h | 9 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.cpp | 32 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.h | 15 | ||||
-rw-r--r-- | TAO/tao/Transport_Mux_Strategy.cpp | 30 | ||||
-rw-r--r-- | TAO/tao/UIOP_Connect.cpp | 10 |
10 files changed, 127 insertions, 104 deletions
diff --git a/TAO/tao/CDR.h b/TAO/tao/CDR.h index 055923e1e8f..67f26c6d4ac 100644 --- a/TAO/tao/CDR.h +++ b/TAO/tao/CDR.h @@ -215,6 +215,11 @@ public: TAO_ORB_Core* orb_core = 0); // Create an input CDR from an output CDR. + TAO_InputCDR (ACE_InputCDR::Transfer_Contents rhs, + TAO_ORB_Core* orb_core = 0); + // Initialize the contents of one CDR from another, without data + // copying and with minimimum locking overhead. + ~TAO_InputCDR (void); // destructor diff --git a/TAO/tao/CDR.i b/TAO/tao/CDR.i index 950e7bb91a6..94a80cc3ad0 100644 --- a/TAO/tao/CDR.i +++ b/TAO/tao/CDR.i @@ -83,6 +83,15 @@ TAO_InputCDR::TAO_InputCDR (const TAO_InputCDR& rhs) } ACE_INLINE +TAO_InputCDR::TAO_InputCDR (ACE_InputCDR::Transfer_Contents rhs, + TAO_ORB_Core* orb_core) + : ACE_InputCDR (rhs), + orb_core_ (orb_core) +{ + this->init_translators (); +} + +ACE_INLINE TAO_InputCDR::~TAO_InputCDR (void) { } diff --git a/TAO/tao/IIOP_Connect.cpp b/TAO/tao/IIOP_Connect.cpp index c6134c60752..a36d8787db5 100644 --- a/TAO/tao/IIOP_Connect.cpp +++ b/TAO/tao/IIOP_Connect.cpp @@ -291,16 +291,18 @@ TAO_IIOP_Server_Connection_Handler::handle_input_i (ACE_HANDLE, // same Event_Handler in two threads at the same time. // Copy message type. - CORBA::Octet message_type = this->transport_.message_state_.message_type; + TAO_GIOP_Message_State &ms = this->transport_.message_state_; + CORBA::Octet message_type = ms.message_type; // Copy version. - TAO_GIOP_Version giop_version = this->transport_.message_state_.giop_version; + TAO_GIOP_Version giop_version = ms.giop_version; // Steal the input CDR from the message state. - TAO_InputCDR input_cdr (this->transport_.message_state_.cdr); + TAO_InputCDR input_cdr (ACE_InputCDR::Transfer_Contents (ms.cdr), + this->orb_core_); // Reset the message state. - this->transport_.message_state_.reset (); + this->transport_.message_state_.reset (0); result = TAO_GIOP::process_server_message (this->transport (), this->orb_core_, diff --git a/TAO/tao/Invocation.i b/TAO/tao/Invocation.i index 19201f56c3e..d298c178c95 100644 --- a/TAO/tao/Invocation.i +++ b/TAO/tao/Invocation.i @@ -87,5 +87,5 @@ TAO_GIOP_Locate_Request_Invocation (TAO_Stub *stub, ACE_INLINE TAO_InputCDR & TAO_GIOP_Locate_Request_Invocation::inp_stream (void) { - return this->rd_. reply_cdr (); + return this->rd_.reply_cdr (); } diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 31541da60a8..3e2b05f06c4 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -1564,6 +1564,18 @@ TAO_Leader_Follower::get_next_follower (void) // **************************************************************** ACE_Allocator* +TAO_ORB_Core::input_cdr_dblock_allocator_i (TAO_ORB_Core_TSS_Resources *tss) +{ + if (tss->input_cdr_dblock_allocator_ == 0) + { + tss->input_cdr_dblock_allocator_ = + this->resource_factory ()->input_cdr_dblock_allocator (); + tss->owns_resources_ = 1; + } + return tss->input_cdr_dblock_allocator_; +} + +ACE_Allocator* TAO_ORB_Core::input_cdr_dblock_allocator (void) { if (this->use_tss_resources_) @@ -1575,13 +1587,7 @@ TAO_ORB_Core::input_cdr_dblock_allocator (void) "TAO_ORB_Core::input_cdr_dblock_allocator (); " "no more TSS keys"), 0); - - if (tss->input_cdr_dblock_allocator_ == 0) - { - tss->input_cdr_dblock_allocator_ = this->resource_factory ()->input_cdr_dblock_allocator (); - tss->owns_resources_ = 1; - } - return tss->input_cdr_dblock_allocator_; + return this->input_cdr_dblock_allocator_i (tss); } if (this->orb_resources_.input_cdr_dblock_allocator_ == 0) @@ -1599,6 +1605,18 @@ TAO_ORB_Core::input_cdr_dblock_allocator (void) } ACE_Allocator* +TAO_ORB_Core::input_cdr_buffer_allocator_i (TAO_ORB_Core_TSS_Resources *tss) +{ + if (tss->input_cdr_buffer_allocator_ == 0) + { + tss->input_cdr_buffer_allocator_ = + this->resource_factory ()->input_cdr_buffer_allocator (); + tss->owns_resources_ = 1; + } + return tss->input_cdr_buffer_allocator_; +} + +ACE_Allocator* TAO_ORB_Core::input_cdr_buffer_allocator (void) { if (this->use_tss_resources_) @@ -1611,12 +1629,7 @@ TAO_ORB_Core::input_cdr_buffer_allocator (void) "no more TSS keys"), 0); - if (tss->input_cdr_buffer_allocator_ == 0) - { - tss->input_cdr_buffer_allocator_ = this->resource_factory ()->input_cdr_buffer_allocator (); - tss->owns_resources_ = 1; - } - return tss->input_cdr_buffer_allocator_; + return this->input_cdr_buffer_allocator_i (tss); } if (this->orb_resources_.input_cdr_buffer_allocator_ == 0) @@ -1716,45 +1729,51 @@ TAO_ORB_Core::create_input_cdr_data_block (size_t size) { ACE_Data_Block *nb = 0; - ACE_Allocator *dblock_allocator = - this->input_cdr_dblock_allocator (); - ACE_Allocator *buffer_allocator = - this->input_cdr_buffer_allocator (); + ACE_Allocator *dblock_allocator; + ACE_Allocator *buffer_allocator; - if (this->resource_factory ()->use_locked_data_blocks ()) + if (this->use_tss_resources_) { - typedef - ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> > - Locked_Data_Block; - - ACE_NEW_MALLOC_RETURN ( - nb, - ACE_static_cast (Locked_Data_Block *, - dblock_allocator->malloc (sizeof (Locked_Data_Block))), - Locked_Data_Block (size, - ACE_Message_Block::MB_DATA, - 0, - buffer_allocator, - 0, - dblock_allocator), - 0); + TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); + if (tss == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "TAO_ORB_Core::create_input_cdr_data_block (); " + "no more TSS keys"), + 0); + + dblock_allocator = + this->input_cdr_dblock_allocator_i (tss); + buffer_allocator = + this->input_cdr_buffer_allocator_i (tss); } else { - ACE_NEW_MALLOC_RETURN ( - nb, - ACE_static_cast(ACE_Data_Block*, - dblock_allocator->malloc (sizeof (ACE_Data_Block))), - ACE_Data_Block (size, - ACE_Message_Block::MB_DATA, - 0, - buffer_allocator, - 0, - 0, - dblock_allocator), - 0); + dblock_allocator = + this->input_cdr_dblock_allocator (); + buffer_allocator = + this->input_cdr_buffer_allocator (); + } + + ACE_Lock* lock_strategy = 0; + if (this->resource_factory ()->use_locked_data_blocks ()) + { + lock_strategy = &this->data_block_lock_; } + ACE_NEW_MALLOC_RETURN ( + nb, + ACE_static_cast(ACE_Data_Block*, + dblock_allocator->malloc (sizeof (ACE_Data_Block))), + ACE_Data_Block (size, + ACE_Message_Block::MB_DATA, + 0, + buffer_allocator, + lock_strategy, + 0, + dblock_allocator), + 0); + return nb; } diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index eaafcaae57f..f544f6c0c6b 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -461,6 +461,12 @@ protected: TAO_Object_Adapter *object_adapter_i (void); // Get <Object Adapter>, assume the lock is held... + ACE_Allocator *input_cdr_dblock_allocator_i (TAO_ORB_Core_TSS_Resources*); + ACE_Allocator *input_cdr_buffer_allocator_i (TAO_ORB_Core_TSS_Resources*); + // Implement the input_cdr_dbblock_allocator() routines using + // pre-fetched TSS resources, this minimizes the number of calls to + // them. + protected: ACE_SYNCH_MUTEX lock_; // Synchronize internal state... @@ -586,6 +592,9 @@ protected: int thread_per_connection_use_timeout_; ACE_Time_Value thread_per_connection_timeout_; // The value of the timeout if the flag above is not zero + + ACE_Lock_Adapter<ACE_SYNCH_MUTEX> data_block_lock_; + // The data block reference counts are locked using this mutex }; // **************************************************************** diff --git a/TAO/tao/Reply_Dispatcher.cpp b/TAO/tao/Reply_Dispatcher.cpp index 1ecb5c3782c..98e851ee961 100644 --- a/TAO/tao/Reply_Dispatcher.cpp +++ b/TAO/tao/Reply_Dispatcher.cpp @@ -21,7 +21,7 @@ TAO_Reply_Dispatcher::~TAO_Reply_Dispatcher (void) } TAO_GIOP_Message_State * -TAO_Reply_Dispatcher::message_state (void) const +TAO_Reply_Dispatcher::message_state (void) { return 0; } @@ -37,10 +37,7 @@ TAO_Reply_Dispatcher::leader_follower_condition_variable (TAO_Transport *) // Constructor. TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher (TAO_ORB_Core *orb_core) - : message_state_ (0), - reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - TAO_ENCAP_BYTE_ORDER, - orb_core), + : message_state_ (orb_core), reply_received_ (0), leader_follower_condition_variable_ (0), orb_core_ (orb_core) @@ -63,7 +60,6 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, this->reply_status_ = reply_status; this->version_ = version; - this->message_state_ = message_state; // Steal the buffer, that way we don't do any unnecesary copies of // this data. @@ -72,8 +68,17 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, IOP::ServiceContext* context_list = reply_ctx.get_buffer (1); this->reply_service_info_.replace (max, len, context_list, 1); - // Steal the buffer so that no copying is done. - this->reply_cdr_.steal_from (message_state->cdr); + if (&this->message_state_ != message_state) + { + // The Transport Mux Strategy did not use our Message_State to + // receive the event, possibly because it is muxing multiple + // requests over the same connection. + + // Steal the buffer so that no copying is done. + this->message_state_.cdr.steal_from (message_state->cdr); + + // There is no need to copy the other fields! + } // If condition variable is present, then we are doing leader // follower model. Do all the nessary things. @@ -98,21 +103,21 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, } TAO_GIOP_Message_State * -TAO_Synch_Reply_Dispatcher::message_state (void) const +TAO_Synch_Reply_Dispatcher::message_state (void) { - return this->message_state_; + return &this->message_state_; } TAO_InputCDR & TAO_Synch_Reply_Dispatcher::reply_cdr (void) { - return this->reply_cdr_; + return this->message_state_.cdr; } int & TAO_Synch_Reply_Dispatcher::reply_received (void) { - return reply_received_; + return this->reply_received_; } int @@ -188,8 +193,9 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, } TAO_GIOP_Message_State * -TAO_Asynch_Reply_Dispatcher::message_state (void) const +TAO_Asynch_Reply_Dispatcher::message_state (void) { return this->message_state_; } + #endif /* TAO_HAS_CORBA_MESSAGING && TAO_POLLER */ diff --git a/TAO/tao/Reply_Dispatcher.h b/TAO/tao/Reply_Dispatcher.h index f2dbc478ad2..41682c3609f 100644 --- a/TAO/tao/Reply_Dispatcher.h +++ b/TAO/tao/Reply_Dispatcher.h @@ -53,7 +53,7 @@ public: TAO_GIOP_Message_State* message_state) = 0; // Dispatch the reply. Return 1 on sucess, -1 on error. - virtual TAO_GIOP_Message_State *message_state (void) const; + virtual TAO_GIOP_Message_State *message_state (void); // Get the Message State into which the reply has been read. const IOP::ServiceContextList& reply_service_info () const; @@ -102,7 +102,7 @@ public: // stack. // Return 1 on sucess, -1 on error. - virtual TAO_GIOP_Message_State *message_state (void) const; + virtual TAO_GIOP_Message_State *message_state (void); // Return the message state of this invocation. virtual TAO_InputCDR &reply_cdr (void); @@ -125,13 +125,8 @@ private: TAO_GIOP_Version version_; // The version - TAO_GIOP_Message_State *message_state_; - // CDR stream for reading the input. - // @@ Carlos : message_state should go away. All we need is the - // reply cdr. Is that rite? (Alex). - - TAO_InputCDR reply_cdr_; - // CDR where the reply message is placed. + TAO_GIOP_Message_State message_state_; + // All the state required to receive the input... int reply_received_; // Flag that indicates the reply has been received. @@ -178,7 +173,7 @@ public: // handler. // Return 1 on sucess, -1 on error. - virtual TAO_GIOP_Message_State *message_state (void) const; + virtual TAO_GIOP_Message_State *message_state (void); // Return the message state. private: diff --git a/TAO/tao/Transport_Mux_Strategy.cpp b/TAO/tao/Transport_Mux_Strategy.cpp index ff3f3d310ad..58f872de9b8 100644 --- a/TAO/tao/Transport_Mux_Strategy.cpp +++ b/TAO/tao/Transport_Mux_Strategy.cpp @@ -79,10 +79,6 @@ TAO_Exclusive_TMS::dispatch_reply (CORBA::ULong request_id, IOP::ServiceContextList& reply_ctx, TAO_GIOP_Message_State* message_state) { - // There can be only one message state possible. Just do a sanity - // check here. - ACE_ASSERT (message_state == &(this->message_state_)); - // Check the ids. if (this->request_id_ != request_id) { @@ -103,36 +99,16 @@ TAO_Exclusive_TMS::dispatch_reply (CORBA::ULong request_id, reply_ctx, message_state); - // Idle the transport now. - // if (this->transport_ != 0) - // this->transport_->idle (); - // @@ Carlos : We can do this, in the Muxed Leader Follower - // implementation. In the older implementation, since the state - // variables are in the Transport, and since we are in the - // handle_input right now, we cannot idle the Transport. This - // means that I cannot use asynchronous requests with Exclusive - // Transport&Old Leader Follower implementation , because I dont - // know when to idle the Transport. - // So I am moving this <idle> call to the destructors of - // synchronous invocations and for asynchronous invocations - // idle'ing is not at all called after the reply is - // received. - // We can enable <idle> out here, once we get rid of the old - // Leader Follower implementation. Then we can get rid of the - // destructors in the Invocation classes and they dont have to - // call <idle>. - // Do I make sense? (Alex). - return result; } TAO_GIOP_Message_State * TAO_Exclusive_TMS::get_message_state (void) { - if (this->rd_ == 0) - return 0; + if (this->rd_ != 0) + return this->rd_->message_state (); - return &(this->message_state_); + return &this->message_state_; } void diff --git a/TAO/tao/UIOP_Connect.cpp b/TAO/tao/UIOP_Connect.cpp index cf76748a3de..f9ec7acfdba 100644 --- a/TAO/tao/UIOP_Connect.cpp +++ b/TAO/tao/UIOP_Connect.cpp @@ -278,16 +278,18 @@ TAO_UIOP_Server_Connection_Handler::handle_input_i (ACE_HANDLE, // same Event_Handler in two threads at the same time. // Copy message type. - CORBA::Octet message_type = this->transport_.message_state_.message_type; + TAO_GIOP_Message_State &ms = this->transport_.message_state_; + CORBA::Octet message_type = ms.message_type; // Copy version. - TAO_GIOP_Version giop_version = this->transport_.message_state_.giop_version; + TAO_GIOP_Version giop_version = ms.giop_version; // Steal the input CDR from the message state. - TAO_InputCDR input_cdr (this->transport_.message_state_.cdr); + TAO_InputCDR input_cdr (ACE_InputCDR::Transfer_Contents (ms.cdr), + this->orb_core_); // Reset the message state. - this->transport_.message_state_.reset (); + this->transport_.message_state_.reset (0); result = TAO_GIOP::process_server_message (this->transport (), this->orb_core_, |