From f074d2dd656b6e91d8ff9faaf7c72148dc43407a Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 13 Jul 1999 21:16:21 +0000 Subject: ChangeLogTag : Tue Jul 13 16:15:43 1999 Alexander Babu Arulanthu --- TAO/tao/IIOP_Connect.cpp | 12 +-- TAO/tao/IIOP_Connect.h | 7 -- TAO/tao/ORB_Core.cpp | 28 +++++- TAO/tao/ORB_Core.h | 11 ++- TAO/tao/Pluggable.cpp | 6 ++ TAO/tao/Pluggable.h | 5 + TAO/tao/Reply_Dispatcher.cpp | 25 ++++- TAO/tao/Reply_Dispatcher.h | 19 +++- TAO/tao/UIOP_Connect.cpp | 17 +--- TAO/tao/UIOP_Connect.h | 6 -- TAO/tao/Wait_Strategy.cpp | 227 +++++++++++++++++++++++++++++++++++++++++-- TAO/tao/Wait_Strategy.h | 13 ++- 12 files changed, 325 insertions(+), 51 deletions(-) diff --git a/TAO/tao/IIOP_Connect.cpp b/TAO/tao/IIOP_Connect.cpp index a1e1448599d..db71e7f2f50 100644 --- a/TAO/tao/IIOP_Connect.cpp +++ b/TAO/tao/IIOP_Connect.cpp @@ -66,12 +66,7 @@ TAO_IIOP_Server_Connection_Handler::TAO_IIOP_Server_Connection_Handler (ACE_Thre : TAO_IIOP_Handler_Base (t), transport_ (this, 0), orb_core_ (0), - tss_resources_ (0), - // This will bomb if get called. But this constructor shouldnt be - // called anyway. - input_cdr_ (orb_core_->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - TAO_ENCAP_BYTE_ORDER, - orb_core_) + tss_resources_ (0) { // This constructor should *never* get called, it is just here to // make the compiler happy: the default implementation of the @@ -85,10 +80,7 @@ TAO_IIOP_Server_Connection_Handler::TAO_IIOP_Server_Connection_Handler (TAO_ORB_ : TAO_IIOP_Handler_Base (orb_core), transport_ (this, orb_core), orb_core_ (orb_core), - tss_resources_ (orb_core->get_tss_resources ()), - input_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - TAO_ENCAP_BYTE_ORDER, - orb_core) + tss_resources_ (orb_core->get_tss_resources ()) { } diff --git a/TAO/tao/IIOP_Connect.h b/TAO/tao/IIOP_Connect.h index ec46b43c06b..c53c2315c84 100644 --- a/TAO/tao/IIOP_Connect.h +++ b/TAO/tao/IIOP_Connect.h @@ -149,13 +149,6 @@ protected: TAO_ORB_Core_TSS_Resources *tss_resources_; // Cached tss resources of the ORB that activated this object. - - TAO_InputCDR input_cdr_; - // CDR used to steal the input cdr contents from the message - // state. This is done so that we can reset the message state - // before making the upcall. This makes the transport to handle the - // Nestedupcall requests coming on the same socket. - }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 403e1437daf..0b912799d0e 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -1303,6 +1303,28 @@ TAO_ORB_Core::object_adapter_i (void) return this->object_adapter_; } +ACE_SYNCH_CONDITION* +TAO_ORB_Core::leader_follower_condition_variable (void) +{ + // Always using TSS. + + // Get tss key. + TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); + + if (tss->leader_follower_condition_variable_ == 0) + { + // Create a new one and return. + ACE_NEW_RETURN (tss->leader_follower_condition_variable_, + ACE_SYNCH_CONDITION (this->leader_follower ().lock ()), + 0); + } + else + { + // Return the condtion variable. + return tss->leader_follower_condition_variable_; + } +} + int TAO_ORB_Core::is_collocated (const TAO_MProfile& mprofile) { @@ -1659,7 +1681,8 @@ TAO_ORB_Core_TSS_Resources::TAO_ORB_Core_TSS_Resources (void) input_cdr_buffer_allocator_ (0), connection_cache_ (0), is_server_thread_ (0), - is_leader_thread_ (0) + is_leader_thread_ (0), + leader_follower_condition_variable_ (0) { } @@ -1694,6 +1717,9 @@ TAO_ORB_Core_TSS_Resources::~TAO_ORB_Core_TSS_Resources (void) // unimplemented delete this->connection_cache_; this->connection_cache_ = 0; + + delete this->leader_follower_condition_variable_; + this->leader_follower_condition_variable_ = 0; } // **************************************************************** diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index 204edf80c8c..80b018eee6e 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -96,6 +96,9 @@ public: int is_leader_thread_; // Is this thread a leader for this ORB? + + ACE_SYNCH_CONDITION* leader_follower_condition_variable_; + // Condition variable for the leader follower model. }; // **************************************************************** @@ -404,10 +407,14 @@ public: // Obtain the TSS resources of this orb. TAO_Leader_Follower &leader_follower (void); - // Get access to the leader_follower class + // Get access to the leader_follower class. int run (ACE_Time_Value *tv, int break_on_timeouts); - // Run the event loop + // Run the event loop. + + ACE_SYNCH_CONDITION* leader_follower_condition_variable (void); + // Condition variable used in the Leader Follower Wait Strategy, on + // which the follower thread blocks. protected: int set_iiop_endpoint (int dotted_decimal_addresses, diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp index 77953791ddd..acdddf5eb77 100644 --- a/TAO/tao/Pluggable.cpp +++ b/TAO/tao/Pluggable.cpp @@ -258,6 +258,12 @@ TAO_Transport::reply_received (const CORBA::ULong request_id) return this->tms ()->reply_received (request_id); } +ACE_SYNCH_CONDITION * +TAO_Transport::leader_follower_condition_variable (void) +{ + return this->wait_strategy ()->leader_follower_condition_variable (); +} + void TAO_Transport::start_request (TAO_ORB_Core *, const TAO_Profile *, diff --git a/TAO/tao/Pluggable.h b/TAO/tao/Pluggable.h index 5e1a6300b8b..64eb1068dbf 100644 --- a/TAO/tao/Pluggable.h +++ b/TAO/tao/Pluggable.h @@ -199,6 +199,11 @@ public: // Check with the TMS whether the reply has been receieved for the // request with . + virtual ACE_SYNCH_CONDITION *leader_follower_condition_variable (void); + // Return the TSS leader follower condition variable used in the + // Wait Strategy. Muxed Leader Follower implementation returns a + // valid condition variable, others return 0. + protected: CORBA::ULong tag_; // IOP protocol tag. diff --git a/TAO/tao/Reply_Dispatcher.cpp b/TAO/tao/Reply_Dispatcher.cpp index 2424db07777..2b494d61c3d 100644 --- a/TAO/tao/Reply_Dispatcher.cpp +++ b/TAO/tao/Reply_Dispatcher.cpp @@ -25,6 +25,12 @@ TAO_Reply_Dispatcher::message_state (void) const return 0; } +void +TAO_Reply_Dispatcher::leader_follower_condition_variable (TAO_Transport *) +{ + // no-op. +} + // ********************************************************************* // Constructor. @@ -33,7 +39,8 @@ TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher (TAO_ORB_Core *orb_core) reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), TAO_ENCAP_BYTE_ORDER, orb_core), - reply_received_ (0) + reply_received_ (0), + leader_follower_condition_variable_ (0) { } @@ -65,6 +72,15 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, // Steal the buffer so that no copying is done. this->reply_cdr_.reset (message_state->cdr.steal_contents (), message_state->cdr.byte_order ()); + + + // If condition variable is present, signal it. + if (this->leader_follower_condition_variable_ != 0) + { + // @@ Carlos: Should we apply lock here? (Alex). + (void) this->leader_follower_condition_variable_->signal (); + } + return 1; } @@ -86,6 +102,13 @@ TAO_Synch_Reply_Dispatcher::reply_received (void) return reply_received_; } +void +TAO_Synch_Reply_Dispatcher::leader_follower_condition_variable (TAO_Transport *transport) +{ + this->leader_follower_condition_variable_ = + transport->leader_follower_condition_variable (); +} + // ********************************************************************* #if defined (TAO_HAS_CORBA_MESSAGING) && defined (TAO_POLLER) // Constructor. diff --git a/TAO/tao/Reply_Dispatcher.h b/TAO/tao/Reply_Dispatcher.h index e8039931313..b66a73c1d84 100644 --- a/TAO/tao/Reply_Dispatcher.h +++ b/TAO/tao/Reply_Dispatcher.h @@ -50,6 +50,11 @@ public: virtual TAO_GIOP_Message_State *message_state (void) const; // Get the Message State into which the reply has been read. + + virtual void leader_follower_condition_variable (TAO_Transport *); + // Obtain the condition variable used in the Leader Follower Wait + // Strategy. This is valid only for the synchronous reply dispatcher + // and only when the Leader Follower wait strategy is used. }; // ********************************************************************* @@ -100,6 +105,10 @@ public: // reply will be dispatched as soon as it is available and the // dispatcher will go away immediately after that. + virtual void leader_follower_condition_variable (TAO_Transport *); + // Obtain the condition variable used in the Leader Follower Wait + // Strategy. + private: CORBA::ULong reply_status_; // Reply or LocateReply status. @@ -112,14 +121,18 @@ private: TAO_GIOP_Message_State *message_state_; // CDR stream for reading the input. - // @@ Carlos : message_state should go away. All we need is the reply - // cdr. Is that rite? (Alex). + // @@ Carlos : message_state should go away. All we need is the + // reply cdr. Is that rite? (Alex). TAO_InputCDR reply_cdr_; // CDR where the reply message is placed. int reply_received_; - // Flag that indicates the reply has been received. + // Flag that indicates the reply has been received. + + ACE_SYNCH_CONDITION *leader_follower_condition_variable_; + // Condition variable used by the leader to notify the follower + // about the availability of the response. }; // ********************************************************************* diff --git a/TAO/tao/UIOP_Connect.cpp b/TAO/tao/UIOP_Connect.cpp index 1da2cb91ea8..07aed2a3895 100644 --- a/TAO/tao/UIOP_Connect.cpp +++ b/TAO/tao/UIOP_Connect.cpp @@ -70,12 +70,7 @@ TAO_UIOP_Server_Connection_Handler::TAO_UIOP_Server_Connection_Handler (ACE_Thre : TAO_UIOP_Handler_Base (t), transport_ (this, 0), orb_core_ (0), - tss_resources_ (0), - // This will bomb if get called. But this constructor shouldnt be - // called anyway. - input_cdr_ (orb_core_->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - TAO_ENCAP_BYTE_ORDER, - orb_core_) + tss_resources_ (0) { // This constructor should *never* get called, it is just here to // make the compiler happy: the default implementation of the @@ -89,10 +84,7 @@ TAO_UIOP_Server_Connection_Handler::TAO_UIOP_Server_Connection_Handler (TAO_ORB_ : TAO_UIOP_Handler_Base (orb_core), transport_ (this, orb_core), orb_core_ (orb_core), - tss_resources_ (orb_core->get_tss_resources ()), - input_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - TAO_ENCAP_BYTE_ORDER, - orb_core) + tss_resources_ (orb_core->get_tss_resources ()) { } @@ -246,15 +238,14 @@ TAO_UIOP_Server_Connection_Handler::handle_input (ACE_HANDLE) TAO_GIOP_Version giop_version = this->transport_.message_state_.giop_version; // Steal the input CDR from the message state. - TAO_InputCDR input_cdr (this->transport_.message_state_.cdr, - this->transport_.message_state_.cdr.length ()); + TAO_InputCDR input_cdr (this->transport_.message_state_.cdr); // Reset the message state. this->transport_.message_state_.reset (); result = TAO_GIOP::process_server_message (this->transport (), this->orb_core_, - this->input_cdr_, + input_cdr, message_type, giop_version); diff --git a/TAO/tao/UIOP_Connect.h b/TAO/tao/UIOP_Connect.h index 74de6a963a1..ec1296023e6 100644 --- a/TAO/tao/UIOP_Connect.h +++ b/TAO/tao/UIOP_Connect.h @@ -150,12 +150,6 @@ protected: TAO_ORB_Core_TSS_Resources *tss_resources_; // Cached tss resources of the ORB that activated this object. - - TAO_InputCDR input_cdr_; - // CDR used to steal the input cdr contents from the message - // state. This is done so that we can reset the message state - // before making the upcall. This makes the transport to handle the - // Nestedupcall requests coming on the same socket. }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index e9f1904bd76..476c1163de3 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -20,7 +20,13 @@ TAO_Wait_Strategy::~TAO_Wait_Strategy (void) int TAO_Wait_Strategy::sending_request (TAO_ORB_Core * /* orb_core */, - int /* two_way */) + int /* two_way */) +{ + return 0; +} + +ACE_SYNCH_CONDITION * +TAO_Wait_Strategy::leader_follower_condition_variable (void) { return 0; } @@ -193,7 +199,7 @@ TAO_Exclusive_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, int TAO_Exclusive_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, - int &) + int &) { // Cache the ORB core, it won't change and is used multiple times // below: @@ -490,13 +496,20 @@ TAO_Muxed_Wait_On_Leader_Follower::~TAO_Muxed_Wait_On_Leader_Follower (void) { } -// @@ Why do we need and the flag? is -// with the object and flag wont make sense -// at this level since this is common for AMI also. (Alex). int TAO_Muxed_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, int two_way) { + // Register the handler. + // @@ We could probably move this somewhere else, and remove this + // function totally. (Alex). + this->transport_->register_handler (); + + // Send the request. + int result = + this->TAO_Wait_Strategy::sending_request (orb_core, + two_way); + return 0; } @@ -504,14 +517,214 @@ int TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, int &reply_received) { - return 0; + // Cache the ORB core, it won't change and is used multiple times + // below: + TAO_ORB_Core* orb_core = + this->transport_->orb_core (); + + TAO_Leader_Follower& leader_follower = + orb_core->leader_follower (); + + // Obtain the lock. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + leader_follower.lock (), -1); + + leader_follower.set_client_thread (); + + ACE_Countdown_Time countdown (max_wait_time); + + // Check if there is a leader, but the leader is not us + if (leader_follower.leader_available () + && !leader_follower.is_leader_thread ()) + { + // = Wait as a follower. + + // ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - wait (follower) on <%x>\n", + // this->transport_)); + + // Grab the condtion variable. + ACE_SYNCH_CONDITION* cond = + orb_core->leader_follower_condition_variable (); + + // Add ourselves to the list, do it only once because we can + // wake up multiple times from the CV loop. + if (leader_follower.add_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) TAO_Muxex_Wait_On_Leader_Follower::wait - " + "add_follower failed for <%x>\n", + cond)); + + while (!reply_received && + leader_follower.leader_available ()) + { + if (max_wait_time == 0) + { + if (cond == 0 || cond->wait () == -1) + return -1; + } + else + { + countdown.update (); + ACE_Time_Value tv = ACE_OS::gettimeofday (); + tv += *max_wait_time; + if (cond == 0 || cond->wait (&tv) == -1) + return -1; + } + } + + countdown.update (); + if (leader_follower.remove_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - " + "remove_follower failed for <%x>\n", cond)); + + // ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - done (follower:%d) on <%x>\n", + // this->reply_received_, this->transport_)); + + // Now somebody woke us up to become a leader or to handle + // our input. We are already removed from the follower queue. + + if (reply_received == 1) + return 0; + + // FALLTHROUGH + // We only get here if we woke up but the reply is not complete + // yet, time to assume the leader role.... + // i.e. ACE_ASSERT (this->reply_received_ == 0); + } + + // = Leader Code. + + // The only way to reach this point is if we must become the leader, + // because there is no leader or we have to update to a leader or we + // are doing nested upcalls in this case we do increase the refcount + // on the leader in TAO_ORB_Core. + + // This might increase the refcount of the leader. + leader_follower.set_leader_thread (); + + int result = 1; + + { + ACE_GUARD_RETURN (ACE_Reverse_Lock, rev_mon, + leader_follower.reverse_lock (), -1); + + // @@ Do we need to do this? + // Become owner of the reactor. + orb_core->reactor ()->owner (ACE_Thread::self ()); + + // Run the reactor event loop. + + // ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - wait (leader) on <%x>\n", + // this->transport_)); + + while (result > 0 && reply_received == 0) + result = orb_core->reactor ()->handle_events (max_wait_time); + + // ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - done (leader) on <%x>\n", + // this->transport_)); + } + + // Wake up the next leader, we cannot do that in handle_input, + // because the woken up thread would try to get into + // handle_events, which is at the time in handle_input still + // occupied. But do it before checking the error in , even + // if there is an error in our input we should continue running the + // loop in another thread. + + leader_follower.reset_leader_thread (); + leader_follower.reset_client_thread (); + + if (leader_follower.elect_new_leader () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Muxed_Wait_On_Leader_Follower::send_request: " + "Failed to unset the leader and wake up a new follower.\n"), + -1); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO:%N:%l:(%P|%t):TAO_Muxed_Wait_On_Leader_Follower::wait: " + "handle_events failed.\n"), + -1); + + // Return an error if there was a problem receiving the reply... + if (max_wait_time != 0) + { + if (reply_received != 1 + && *max_wait_time == ACE_Time_Value::zero) + { + result = -1; + errno = ETIME; + } + } + else + { + result = 0; + if (reply_received == -1) + { + result = -1; + } + } + + return result; } // Handle the input. Return -1 on error, 0 on success. int TAO_Muxed_Wait_On_Leader_Follower::handle_input (void) { - return 0; + // Cache the ORB core, it won't change and is used multiple times + // below: + TAO_ORB_Core* orb_core = + this->transport_->orb_core (); + + // Obtain the lock. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, + orb_core->leader_follower ().lock (), + -1); + + // ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - reading reply <%x>\n", + // this->transport_)); + + // Receive any data that is available, without blocking... + int result = this->transport_->handle_client_input (0); + + // Data was read, but there the reply has not been completely + // received... + if (result == 0) + return 0; + + if (result == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Wait_On_LF::handle_input, " + "handle_client_input == -1\n")); + // this->reply_received_ = -1; + } + + if (result == 1) + { + // Change the result value to something that the Reactor can + // understand + result = 0; + + // reply_received_ = 1; + // This would have been done by the dispatch already. + } + + // Wake up any threads waiting for this message, either because the + // message failed or because we really received it. + // this->wake_up (); + // will be done in the + + return result; +} + +ACE_SYNCH_CONDITION * +TAO_Muxed_Wait_On_Leader_Follower::leader_follower_condition_variable (void) +{ + return this->transport_->orb_core ()->leader_follower_condition_variable (); } // ********************************************************************* diff --git a/TAO/tao/Wait_Strategy.h b/TAO/tao/Wait_Strategy.h index 8eaa4b15d30..12a690c82db 100644 --- a/TAO/tao/Wait_Strategy.h +++ b/TAO/tao/Wait_Strategy.h @@ -60,6 +60,11 @@ public: // Register the handler with the Reactor if it makes sense for the // strategy. + virtual ACE_SYNCH_CONDITION *leader_follower_condition_variable (void); + // Return the TSS leader follower condition variable used in the + // Wait Strategy. Muxed Leader Follower implementation returns a + // valid condition variable, others return 0. + protected: TAO_Transport *transport_; // Transport object. @@ -67,7 +72,7 @@ protected: // @@ Alex: we should consider moving these classes to separate files, // that can minimize the footprint of systems that use only one of -// the strategies.... +// the strategies....(coryan). // ********************************************************************* @@ -223,10 +228,16 @@ public: virtual int sending_request (TAO_ORB_Core *orb_core, int two_way); + virtual int wait (ACE_Time_Value *max_wait_time, int &reply_received); + virtual int handle_input (void); + // virtual int register_handler (void); + + virtual ACE_SYNCH_CONDITION *leader_follower_condition_variable (void); + // TSS Leader follower condition variable. }; // ********************************************************************* -- cgit v1.2.1