summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-07-14 04:34:25 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-07-14 04:34:25 +0000
commit54f008679dc1c21f1cdbef430f6a0ed5270659b0 (patch)
treed3b011a43e2345513e3c1bb6ee39232cafd360dd
parente6fc07fb7be279af456894f8102846076fac051e (diff)
downloadATCD-54f008679dc1c21f1cdbef430f6a0ed5270659b0.tar.gz
ChangeLogTag : Tue Jul 13 23:32:05 1999 Alexander Babu Arulanthu <alex@cs.wustl.edu>
-rw-r--r--TAO/tao/ORB_Core.cpp25
-rw-r--r--TAO/tao/Reply_Dispatcher.cpp20
-rw-r--r--TAO/tao/Reply_Dispatcher.h3
-rw-r--r--TAO/tao/Wait_Strategy.cpp121
4 files changed, 128 insertions, 41 deletions
diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp
index 0b912799d0e..01ccbb8a1af 100644
--- a/TAO/tao/ORB_Core.cpp
+++ b/TAO/tao/ORB_Core.cpp
@@ -1318,11 +1318,8 @@ TAO_ORB_Core::leader_follower_condition_variable (void)
ACE_SYNCH_CONDITION (this->leader_follower ().lock ()),
0);
}
- else
- {
- // Return the condtion variable.
- return tss->leader_follower_condition_variable_;
- }
+
+ return tss->leader_follower_condition_variable_;
}
int
@@ -1434,8 +1431,22 @@ TAO_Leader_Follower::get_next_follower (void)
if (iterator.first () == 0)
// means set is empty
return 0;
-
- return *iterator;
+
+ if (TAO_debug_level >= 4)
+ ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - next follower is %x\n",
+ *iterator));
+
+ ACE_SYNCH_CONDITION *cond = *iterator;
+
+ // We *must* remove it when we signal it so the same condition is
+ // not signalled for both wake up as a follower and as the next
+ // leader.
+ // The follower may not be there if the reply is received while the
+ // consumer is not yet waiting for it (i.e. it send the request but
+ // has not blocked to receive the reply yet)
+ (void) this->remove_follower (cond); // Ignore errors..
+
+ return cond;
}
// ****************************************************************
diff --git a/TAO/tao/Reply_Dispatcher.cpp b/TAO/tao/Reply_Dispatcher.cpp
index 2b494d61c3d..6bb9ee1751c 100644
--- a/TAO/tao/Reply_Dispatcher.cpp
+++ b/TAO/tao/Reply_Dispatcher.cpp
@@ -40,7 +40,8 @@ TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher (TAO_ORB_Core *orb_core)
TAO_ENCAP_BYTE_ORDER,
orb_core),
reply_received_ (0),
- leader_follower_condition_variable_ (0)
+ leader_follower_condition_variable_ (0),
+ orb_core_ (orb_core)
{
}
@@ -72,12 +73,23 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status,
// Steal the buffer so that no copying is done.
this->reply_cdr_.reset (message_state->cdr.steal_contents (),
message_state->cdr.byte_order ());
-
- // If condition variable is present, signal it.
+ // If condition variable is present, then we are doing leader
+ // follower model. Do all the nessary things.
if (this->leader_follower_condition_variable_ != 0)
{
- // @@ Carlos: Should we apply lock here? (Alex).
+ TAO_Leader_Follower& leader_follower =
+ this->orb_core_->leader_follower ();
+
+ // We *must* remove it when we signal it so the same condition
+ // is not signalled for both wake up as a follower and as the
+ // next leader.
+ // The follower may not be there if the reply is received while
+ // the consumer is not yet waiting for it (i.e. it send the
+ // request but has not blocked to receive the reply yet).
+ // Ignore errors.
+ (void) leader_follower.remove_follower (this->leader_follower_condition_variable_);
+
(void) this->leader_follower_condition_variable_->signal ();
}
diff --git a/TAO/tao/Reply_Dispatcher.h b/TAO/tao/Reply_Dispatcher.h
index b66a73c1d84..3298b12b662 100644
--- a/TAO/tao/Reply_Dispatcher.h
+++ b/TAO/tao/Reply_Dispatcher.h
@@ -133,6 +133,9 @@ private:
ACE_SYNCH_CONDITION *leader_follower_condition_variable_;
// Condition variable used by the leader to notify the follower
// about the availability of the response.
+
+ TAO_ORB_Core *orb_core_;
+ // Cache the ORB Core pointer.
};
// *********************************************************************
diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp
index cc9c649ec37..e29b8b1b035 100644
--- a/TAO/tao/Wait_Strategy.cpp
+++ b/TAO/tao/Wait_Strategy.cpp
@@ -237,12 +237,19 @@ TAO_Exclusive_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
this->cond_response_available ();
// Add ourselves to the list, do it only once because we can
- // wake up multiple times from the CV loop
- if (leader_follower.add_follower (cond) == -1)
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) TAO_Exclusive_Wait_On_Leader_Follower::wait - "
- "add_follower failed for <%x>\n",
- cond));
+ // wake up multiple times from the CV loop. And only do it if
+ // the reply has not been received (it could have arrived while
+ // we were preparing to receive it).
+
+ if (!this->reply_received_
+ && leader_follower.leader_available ())
+ {
+ if (leader_follower.add_follower (cond) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) TAO_Wait_On_Leader_Follower::wait - "
+ "add_follower failed for <%x>\n",
+ cond));
+ }
while (!this->reply_received_ &&
leader_follower.leader_available ())
@@ -263,10 +270,16 @@ TAO_Exclusive_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
}
countdown.update ();
+
+#if 0
+ // Cannot remove the follower here, we *must* remove it when we
+ // signal it so the same condition is not signalled for both
+ // wake up as a follower and as the next leader.
if (leader_follower.remove_follower (cond) == -1)
ACE_ERROR ((LM_ERROR,
"TAO (%P|%t) TAO_Exclusive_Wait_On_Leader_Follower::wait - "
"remove_follower failed for <%x>\n", cond));
+#endif /* 0 */
//ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - done (follower:%d) on <%x>\n",
//this->reply_received_, this->transport_));
@@ -479,6 +492,21 @@ TAO_Exclusive_Wait_On_Leader_Follower::wake_up (void)
ACE_SYNCH_CONDITION* cond =
this->cond_response_available ();
+ //if (TAO_debug_level > 0)
+ //ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - wake up follower %x\n",
+ // cond));
+
+ TAO_Leader_Follower& leader_follower =
+ this->transport_->orb_core ()->leader_follower ();
+
+ // We *must* remove it when we signal it so the same condition is
+ // not signalled for both wake up as a follower and as the next
+ // leader.
+ // The follower may not be there if the reply is received while the
+ // consumer is not yet waiting for it (i.e. it send the request but
+ // has not blocked to receive the reply yet)
+ (void) leader_follower.remove_follower (cond); // Ignore errors
+
if (cond != 0)
(void) cond->signal ();
}
@@ -539,21 +567,30 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
{
// = Wait as a follower.
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - wait (follower) on <%x>\n",
- this->transport_));
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (follower) on <%x>\n",
+ this->transport_));
// Grab the condtion variable.
ACE_SYNCH_CONDITION* cond =
orb_core->leader_follower_condition_variable ();
// Add ourselves to the list, do it only once because we can
- // wake up multiple times from the CV loop.
- if (leader_follower.add_follower (cond) == -1)
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) TAO_Muxex_Wait_On_Leader_Follower::wait - "
- "add_follower failed for <%x>\n",
- cond));
+ // wake up multiple times from the CV loop. And only do it if
+ // the reply has not been received (it could have arrived while
+ // we were preparing to receive it).
+
+ if (!reply_received
+ && leader_follower.leader_available ())
+ {
+ if (leader_follower.add_follower (cond) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - "
+ "add_follower failed for <%x>\n",
+ cond));
+ }
+
while (!reply_received &&
leader_follower.leader_available ())
@@ -561,7 +598,14 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
if (max_wait_time == 0)
{
if (cond == 0 || cond->wait () == -1)
- return -1;
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (follower) on <%x> "
+ "cond == 0 || cond->wait () == -1 : cond = %d\n",
+ this->transport_, (cond == 0) ? 0 : cond));
+ return -1;
+ }
}
else
{
@@ -569,20 +613,34 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
ACE_Time_Value tv = ACE_OS::gettimeofday ();
tv += *max_wait_time;
if (cond == 0 || cond->wait (&tv) == -1)
- return -1;
+ {
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (follower) on <%x> "
+ "cond == 0 || cond->wait (tv) == -1\n",
+ this->transport_));
+ return -1;
+ }
}
}
countdown.update ();
+
+#if 0
+ // Cannot remove the follower here, we *must* remove it when we
+ // signal it so the same condition is not signalled for both
+ // wake up as a follower and as the next leader.
if (leader_follower.remove_follower (cond) == -1)
ACE_ERROR ((LM_ERROR,
"TAO (%P|%t) TAO_Muxed_Wait_On_Leader_Follower::wait - "
"remove_follower failed for <%x>\n", cond));
+#endif /* 0 */
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - done (follower) on <%x>, reply_received %d\n",
- reply_received, this->transport_));
-
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - done (follower) on <%x>, reply_received %d\n",
+ this->transport_, reply_received));
+
// Now somebody woke us up to become a leader or to handle
// our input. We are already removed from the follower queue.
@@ -617,16 +675,18 @@ TAO_Muxed_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time,
// Run the reactor event loop.
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n",
- this->transport_));
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n",
+ this->transport_));
while (result > 0 && reply_received == 0)
result = orb_core->reactor ()->handle_events (max_wait_time);
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n",
- this->transport_));
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n",
+ this->transport_));
}
// Wake up the next leader, we cannot do that in handle_input,
@@ -687,9 +747,10 @@ TAO_Muxed_Wait_On_Leader_Follower::handle_input (void)
orb_core->leader_follower ().lock (),
-1);
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - reading reply on <%x>\n",
- this->transport_));
+ if (TAO_debug_level >= 5)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - reading reply on <%x>\n",
+ this->transport_));
// Receive any data that is available, without blocking...
int result = this->transport_->handle_client_input (0);