diff options
-rw-r--r-- | TAO/tao/ORB_Core.cpp | 25 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.cpp | 20 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.h | 3 | ||||
-rw-r--r-- | TAO/tao/Wait_Strategy.cpp | 121 |
4 files changed, 128 insertions, 41 deletions
diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 0b912799d0e..01ccbb8a1af 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -1318,11 +1318,8 @@ TAO_ORB_Core::leader_follower_condition_variable (void) ACE_SYNCH_CONDITION (this->leader_follower ().lock ()), 0); } - else - { - // Return the condtion variable. - return tss->leader_follower_condition_variable_; - } + + return tss->leader_follower_condition_variable_; } int @@ -1434,8 +1431,22 @@ TAO_Leader_Follower::get_next_follower (void) if (iterator.first () == 0) // means set is empty return 0; - - return *iterator; + + if (TAO_debug_level >= 4) + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - next follower is %x\n", + *iterator)); + + ACE_SYNCH_CONDITION *cond = *iterator; + + // We *must* remove it when we signal it so the same condition is + // not signalled for both wake up as a follower and as the next + // leader. + // The follower may not be there if the reply is received while the + // consumer is not yet waiting for it (i.e. it send the request but + // has not blocked to receive the reply yet) + (void) this->remove_follower (cond); // Ignore errors.. + + return cond; } // **************************************************************** diff --git a/TAO/tao/Reply_Dispatcher.cpp b/TAO/tao/Reply_Dispatcher.cpp index 2b494d61c3d..6bb9ee1751c 100644 --- a/TAO/tao/Reply_Dispatcher.cpp +++ b/TAO/tao/Reply_Dispatcher.cpp @@ -40,7 +40,8 @@ TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher (TAO_ORB_Core *orb_core) TAO_ENCAP_BYTE_ORDER, orb_core), reply_received_ (0), - leader_follower_condition_variable_ (0) + leader_follower_condition_variable_ (0), + orb_core_ (orb_core) { } @@ -72,12 +73,23 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, // Steal the buffer so that no copying is done. this->reply_cdr_.reset (message_state->cdr.steal_contents (), message_state->cdr.byte_order ()); - - // If condition variable is present, signal it. + // If condition variable is present, then we are doing leader + // follower model. Do all the nessary things. if (this->leader_follower_condition_variable_ != 0) { - // @@ Carlos: Should we apply lock here? (Alex). + TAO_Leader_Follower& leader_follower = + this->orb_core_->leader_follower (); + + // We *must* remove it when we signal it so the same condition + // is not signalled for both wake up as a follower and as the + // next leader. + // The follower may not be there if the reply is received while + // the consumer is not yet waiting for it (i.e. it send the + // request but has not blocked to receive the reply yet). + // Ignore errors. + (void) leader_follower.remove_follower (this->leader_follower_condition_variable_); + (void) this->leader_follower_condition_variable_->signal (); } diff --git a/TAO/tao/Reply_Dispatcher.h b/TAO/tao/Reply_Dispatcher.h index b66a73c1d84..3298b12b662 100644 --- a/TAO/tao/Reply_Dispatcher.h +++ b/TAO/tao/Reply_Dispatcher.h @@ -133,6 +133,9 @@ private: ACE_SYNCH_CONDITION *leader_follower_condition_variable_; // Condition variable used by the leader to notify the follower // about the availability of the response. + + TAO_ORB_Core *orb_core_; + // Cache the ORB Core pointer. }; // ********************************************************************* diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index cc9c649ec37..e29b8b1b035 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -237,12 +237,19 @@ TAO_Exclusive_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, this->cond_response_available (); // Add ourselves to the list, do it only once because we can - // wake up multiple times from the CV loop - if (leader_follower.add_follower (cond) == -1) - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) TAO_Exclusive_Wait_On_Leader_Follower::wait - " - "add_follower failed for <%x>\n", - cond)); + // wake up multiple times from the CV loop. And only do it if + // the reply has not been received (it could have arrived while + // we were preparing to receive it). + + if (!this->reply_received_ + && leader_follower.leader_available ()) + { + if (leader_follower.add_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) TAO_Wait_On_Leader_Follower::wait - " + "add_follower failed for <%x>\n", + cond)); + } while (!this->reply_received_ && leader_follower.leader_available ()) @@ -263,10 +270,16 @@ TAO_Exclusive_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, } countdown.update (); + +#if 0 + // Cannot remove the follower here, we *must* remove it when we + // signal it so the same condition is not signalled for both + // wake up as a follower and as the next leader. if (leader_follower.remove_follower (cond) == -1) ACE_ERROR ((LM_ERROR, "TAO (%P|%t) TAO_Exclusive_Wait_On_Leader_Follower::wait - " "remove_follower failed for <%x>\n", cond)); +#endif /* 0 */ //ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - done (follower:%d) on <%x>\n", //this->reply_received_, this->transport_)); @@ -479,6 +492,21 @@ TAO_Exclusive_Wait_On_Leader_Follower::wake_up (void) ACE_SYNCH_CONDITION* cond = this->cond_response_available (); + //if (TAO_debug_level > 0) + //ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - wake up follower %x\n", + // cond)); + + TAO_Leader_Follower& leader_follower = + this->transport_->orb_core ()->leader_follower (); + + // We *must* remove it when we signal it so the same condition is + // not signalled for both wake up as a follower and as the next + // leader. + // The follower may not be there if the reply is received while the + // consumer is not yet waiting for it (i.e. it send the request but + // has not blocked to receive the reply yet) + (void) leader_follower.remove_follower (cond); // Ignore errors + if (cond != 0) (void) cond->signal (); } @@ -539,21 +567,30 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, { // = Wait as a follower. - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - wait (follower) on <%x>\n", - this->transport_)); + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (follower) on <%x>\n", + this->transport_)); // Grab the condtion variable. ACE_SYNCH_CONDITION* cond = orb_core->leader_follower_condition_variable (); // Add ourselves to the list, do it only once because we can - // wake up multiple times from the CV loop. - if (leader_follower.add_follower (cond) == -1) - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) TAO_Muxex_Wait_On_Leader_Follower::wait - " - "add_follower failed for <%x>\n", - cond)); + // wake up multiple times from the CV loop. And only do it if + // the reply has not been received (it could have arrived while + // we were preparing to receive it). + + if (!reply_received + && leader_follower.leader_available ()) + { + if (leader_follower.add_follower (cond) == -1) + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - " + "add_follower failed for <%x>\n", + cond)); + } + while (!reply_received && leader_follower.leader_available ()) @@ -561,7 +598,14 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, if (max_wait_time == 0) { if (cond == 0 || cond->wait () == -1) - return -1; + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (follower) on <%x> " + "cond == 0 || cond->wait () == -1 : cond = %d\n", + this->transport_, (cond == 0) ? 0 : cond)); + return -1; + } } else { @@ -569,20 +613,34 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, ACE_Time_Value tv = ACE_OS::gettimeofday (); tv += *max_wait_time; if (cond == 0 || cond->wait (&tv) == -1) - return -1; + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (follower) on <%x> " + "cond == 0 || cond->wait (tv) == -1\n", + this->transport_)); + return -1; + } } } countdown.update (); + +#if 0 + // Cannot remove the follower here, we *must* remove it when we + // signal it so the same condition is not signalled for both + // wake up as a follower and as the next leader. if (leader_follower.remove_follower (cond) == -1) ACE_ERROR ((LM_ERROR, "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - " "remove_follower failed for <%x>\n", cond)); +#endif /* 0 */ - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - done (follower) on <%x>, reply_received %d\n", - reply_received, this->transport_)); - + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - done (follower) on <%x>, reply_received %d\n", + this->transport_, reply_received)); + // Now somebody woke us up to become a leader or to handle // our input. We are already removed from the follower queue. @@ -617,16 +675,18 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, // Run the reactor event loop. - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n", - this->transport_)); + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n", + this->transport_)); while (result > 0 && reply_received == 0) result = orb_core->reactor ()->handle_events (max_wait_time); - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n", - this->transport_)); + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n", + this->transport_)); } // Wake up the next leader, we cannot do that in handle_input, @@ -687,9 +747,10 @@ TAO_Muxed_Wait_On_Leader_Follower::handle_input (void) orb_core->leader_follower ().lock (), -1); - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - reading reply on <%x>\n", - this->transport_)); + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - reading reply on <%x>\n", + this->transport_)); // Receive any data that is available, without blocking... int result = this->transport_->handle_client_input (0); |