summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormk1 <mk1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-07-12 21:19:27 +0000
committermk1 <mk1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-07-12 21:19:27 +0000
commitd8877a0192a37586185bdd3cfe7609b62ddedd61 (patch)
tree5d0fb41810567f9915823e08b11b6561f7bff879
parent9d981ec55ff95442c8daaf643bf7fff2ee3ca3d6 (diff)
downloadATCD-d8877a0192a37586185bdd3cfe7609b62ddedd61.tar.gz
New Branch for the multithreaded client ORB
-rw-r--r--TAO/tao/Connect.cpp306
-rw-r--r--TAO/tao/GIOP.cpp15
-rw-r--r--TAO/tao/ORB.cpp68
-rw-r--r--TAO/tao/ORB.h3
-rw-r--r--TAO/tao/ORB_Core.cpp140
-rw-r--r--TAO/tao/ORB_Core.h55
6 files changed, 513 insertions, 74 deletions
diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp
index c370caf1fb7..2cddc3c285a 100644
--- a/TAO/tao/Connect.cpp
+++ b/TAO/tao/Connect.cpp
@@ -402,6 +402,8 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE)
ACE_FUNCTION_TIMEPROBE (TAO_SERVER_CONNECTION_HANDLER_HANDLE_INPUT_START);
+
+
// @@ TODO This should take its memory from a specialized
// allocator. It is better to use a message block than a on stack
// buffer because we cannot minimize memory copies in that case.
@@ -490,8 +492,16 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE)
TAO_Client_Connection_Handler::TAO_Client_Connection_Handler (ACE_Thread_Manager *t)
: TAO_SVC_HANDLER (t, 0, 0),
- input_available_ (0)
+ input_available_ (0),
+ calling_thread_ (0)
+{
+ this->cond_response_available_ =
+ new ACE_SYNCH_CONDITION (TAO_ORB_Core_instance ()->leader_follower_lock ());
+}
+
+TAO_Client_Connection_Handler::~TAO_Client_Connection_Handler ()
{
+ delete this->cond_response_available_;
}
int
@@ -576,29 +586,118 @@ TAO_Client_Connection_Handler::send_request (TAO_OutputCDR &stream,
return -1;
if (is_twoway)
+ {
+ // remember in which thread the client connection handler was running
+ this->calling_thread_ = ACE_Thread::self ();
+ if (TAO_ORB_Core_instance ()->leader_follower_lock ().acquire() == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) TAO_Client_Connection_Handler::send_request: "
+ "Failed to get the lock.\n"),
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Client_Connection_Handler::send_request: (%d) starting\n",
+ ACE_Thread::self ()));
+
+ // check if there is a leader, but the leader is not us
+ if (TAO_ORB_Core_instance ()->leader_available ()
+ && !TAO_ORB_Core_instance ()->I_am_the_leader_thread ())
{
- // Go into a loop, waiting until it's safe to try to read
- // something on the soket. The handle_input() method doesn't
- // actualy do the read, though, proper behavior based on what is
- // read may be different if we're not using GIOP above here.
- // So, we leave the reading of the response to the caller of
- // this method, and simply insure that this method doesn't
- // return until such time as doing a recv() on the socket would
- // actually produce fruit.
- ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor ();
-
- int ret = 0;
-
- while (ret != -1 && ! this->input_available_)
- ret = r->handle_events ();
-
- this->input_available_ = 0;
- // We can get events now, b/c we want them!
- r->resume_handler (this);
- // We're no longer expecting a response!
- this->expecting_response_ = 0;
+ // wait as long as no input is available and/or
+ // no leader is available
+ while (!this->input_available_
+ && TAO_ORB_Core_instance ()->leader_available ())
+ {
+ if (TAO_ORB_Core_instance ()->add_follower (this->cond_response_available_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%P|%t) TAO_Client_Connection_Handler::send_request: "
+ "Failed to add a follower thread\n"));
+ this->cond_response_available_->wait ();
+ }
+ // now somebody woke us up to become a leader or
+ // to handle our input. We are already removed from the
+ // follower queue
+ if (this->input_available_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Client_Connection_Handler::send_request: fake handle_input\n",
+ ACE_Thread::self ()));
+
+
+
+ // there is input waiting for me
+ TAO_ORB_Core_instance ()->leader_follower_lock ().release ();
+
+ int ret = 0; //this->handle_input (); // fake the handle_input
+ if (ret < 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Client_Connection_Handler::send_request: (%d) "
+ "failure faking handle_input\n",
+ ACE_Thread::self ()));
+ TAO_ORB_Core_instance ()->reactor ()->remove_handler (this,
+ ACE_Event_Handler::ALL_EVENTS_MASK);
+ // failure handling
+ return -1;
+ }
+ /* else if (ret > 0)
+ // we have to reschedule, not implemented yet
+ */
+
+ // the following variables are safe, because we are not registered with
+ // the reactor any more.
+ this->input_available_ = 0;
+ this->expecting_response_ = 0;
+ this->calling_thread_ = 0;
+ return 0;
+ }
}
+ // become a leader, because there is no leader or we have to update to a leader
+ // or we are doing nested upcalls in this case we do increase the refcount
+ ACE_DEBUG ((LM_DEBUG,
+ "Client_Connection_Handler::send_request: (%d) become a leader\n",
+ ACE_Thread::self ()));
+
+ TAO_ORB_Core_instance ()->set_leader_thread ();
+ // this might increase the recount of the leader
+
+ if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) TAO_Client_Connection_Handler::send_request: "
+ "Failed to release the lock.\n"),
+ -1);
+
+ ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor ();
+ r->owner (ACE_Thread::self ());
+
+ int ret = 0;
+
+ while (ret != -1 && !this->input_available_)
+ ret = r->handle_events ();
+
+ if (ret == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) TAO_Client_Connection_Handler::send_request: "
+ "handle_events failed.\n"),
+ -1);
+
+
+ // wake up the next leader, we cannot do that in handle_input,
+ // because the woken up thread would try to get into handle_events,
+ // which is at the time in handle_input still occupied.
+
+ if (TAO_ORB_Core_instance ()->unset_leader_wake_up_follower () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) TAO_Client_Connection_Handler::send_request: "
+ "Failed to unset the leader and wake up a new follower.\n"),
+ -1);
+
+ this->input_available_ = 0;
+ this->expecting_response_ = 0;
+ this->calling_thread_ = 0;
+ }
+
return 0;
}
@@ -607,54 +706,133 @@ TAO_Client_Connection_Handler::handle_input (ACE_HANDLE)
{
int retval = 0;
- if (this->expecting_response_)
+
+ TAO_ORB_Core_instance ()->leader_follower_lock ().acquire ();
+
+ if (!this->expecting_response_)
+ {
+ // we got something, but did not want
+ // @@ wake up an other thread, we are lost
+
+ // We're a client, so we're not expecting to see input. Still
+ // we better check what it is!
+ char ignored;
+ ssize_t ret;
+ ACE_Time_Value tv = ACE_Time_Value::zero;
+ retval = 0;
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d) "
+ "not expected response\n",
+ this->calling_thread_));
+ retval = -1;
+ if (this->calling_thread_ == 0)
{
- this->input_available_ = 1;
- // Temporarily remove ourself from notification so that if
- // another sub event loop is in effect still waiting for its
- // response, it doesn't spin tightly gobbling up CPU.
- TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this);
+ ret = this->peer().recv (&ignored, sizeof ignored, MSG_PEEK, &tv);
+ retval = 0;
+ // if -1 is returned, the nested upcalls server crashes,
+ // if the tv value is not specified we will hang in a blocking read
}
- else
+ else if (ret = this->peer().recv (&ignored, sizeof ignored, MSG_PEEK) > 0)
{
- // We're a client, so we're not expecting to see input. Still
- // we better check what it is!
- char ignored;
- ssize_t ret = this->peer().recv (&ignored, sizeof ignored, MSG_PEEK);
-
- // We're not expecting input at this time, so we'll always
- // return -1 for now.
- retval = -1;
- switch (ret)
- {
- case -1:
- // Error...but we weren't expecting input, either...what
- // should we do?
- ACE_ERROR ((LM_WARNING,
- "Client_Connection_Handler::handle_input received "
- "error while reading unexpected input; closing connection on fd %d\n",
- this->peer().get_handle ()));
- break;
-
- case 1:
- // We weren't expecting input, so what should we do with it?
- // Log an error, and close the connection.
- ACE_ERROR ((LM_WARNING,
- "Client_Connection_Handler::handle_input received "
- "input while not expecting a response; closing connection on fd %d\n",
- this->peer().get_handle ()));
- break;
-
- case 0:
- // This is an EOF, so we will return -1 and let
- // handle_close() take over. As long as handle_close()
- // calls the Svc_Handler<>::handle_close(), the socket will
- // be shutdown properly.
- break;
- }
+ ret = this->peer().recv_n (&ignored, sizeof ignored);
}
- return retval;
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d) "
+ "ret = %d\n",
+ this->calling_thread_,
+ ret));
+
+ // We're not expecting input at this time, so we'll always
+ // return -1 for now.
+ // -1 and rec_n with tv = 0 worked
+ switch (ret)
+ {
+ case -1:
+ // Error...but we weren't expecting input, either...what
+ // should we do?
+ ACE_ERROR ((LM_WARNING,
+ "Client_Connection_Handler::handle_input: closing connection on fd %d\n",
+ this->peer().get_handle ()));
+ break;
+
+ case 1:
+ // We weren't expecting input, so what should we do with it?
+ // Log an error, and close the connection.
+ ACE_ERROR ((LM_WARNING,
+ "Client_Connection_Handler::handle_input received "
+ "input while not expecting a response; closing connection on fd %d\n",
+ this->peer().get_handle ()));
+ break;
+
+ case 0:
+ // This is an EOF, so we will return -1 and let
+ // handle_close() take over. As long as handle_close()
+ // calls the Svc_Handler<>::handle_close(), the socket will
+ // be shutdown properly.
+ break;
+ }
+
+ TAO_ORB_Core_instance ()->leader_follower_lock ().release ();
+ return retval;
+ }
+
+ if (this->calling_thread_ == ACE_Thread::self ())
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d): "
+ "right thread\n",
+ this->calling_thread_));
+ // we are now a leader getting its response
+ // or a follower faking the handle_input
+
+ this->input_available_ = 1;
+
+ TAO_ORB_Core_instance ()->leader_follower_lock ().release ();
+
+ TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this);
+
+ return 0;
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d): "
+ "wrong thread\n",
+ this->calling_thread_));
+ // we are a leader, which got a response for one of the followers,
+ // which means we are now a thread running the wrong Client_Connection_Handler
+
+ // Close connection
+ if (this->calling_thread_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Client_Connection_Handler::handle_input: calling thread is null: "
+ "wrong thread\n",
+ this->calling_thread_));
+
+ //TAO_ORB_Core_instance ()->leader_follower_lock ().release ();
+ //return -1;
+ }
+
+ TAO_ORB_Core_instance ()->remove_follower (this->cond_response_available_);
+
+ TAO_ORB_Core_instance ()->leader_follower_lock ().release ();
+
+ TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this);
+
+ TAO_ORB_Core_instance ()->leader_follower_lock ().acquire ();
+
+ // the thread was already selected to become a leader,
+ // so we will be called again.
+ this->input_available_ = 1;
+ this->cond_response_available_->signal ();
+
+ TAO_ORB_Core_instance ()->leader_follower_lock ().release ();
+
+
+ return 0;
+ }
}
int
diff --git a/TAO/tao/GIOP.cpp b/TAO/tao/GIOP.cpp
index 5faecca7023..4236a7c2151 100644
--- a/TAO/tao/GIOP.cpp
+++ b/TAO/tao/GIOP.cpp
@@ -905,6 +905,13 @@ TAO_GIOP_Invocation::invoke (CORBA::ExceptionList &exceptions,
TAO_SVC_HANDLER *handler = this->handler_;
TAO_GIOP::Message_Type m = TAO_GIOP::recv_request (handler,
this->inp_stream_);
+ {
+ //ACE_Guard<ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ());
+ TAO_ORB_Core_instance ()->reactor ()->resume_handler (this->handler_);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) GIOP: resume.\n"));
+ }
+
switch (m)
{
case TAO_GIOP::Reply:
@@ -1279,6 +1286,14 @@ TAO_GIOP_Invocation::invoke (TAO_Exception_Data *excepts,
TAO_SVC_HANDLER *handler = this->handler_;
TAO_GIOP::Message_Type m = TAO_GIOP::recv_request (handler,
this->inp_stream_);
+
+ {
+ //ACE_Guard<ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ());
+ TAO_ORB_Core_instance ()->reactor ()->resume_handler (this->handler_);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) GIOP: resume.\n"));
+ }
+
switch (m)
{
case TAO_GIOP::Reply:
diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp
index c62216f6b74..416b39f276a 100644
--- a/TAO/tao/ORB.cpp
+++ b/TAO/tao/ORB.cpp
@@ -88,6 +88,8 @@ CORBA_ORB::CORBA_ORB (void)
event_service_ (CORBA_Object::_nil ()),
trading_service_ (CORBA_Object::_nil ())
{
+ this->cond_become_leader_ =
+ new ACE_SYNCH_CONDITION (TAO_ORB_Core_instance ()->leader_follower_lock ());
}
CORBA_ORB::~CORBA_ORB (void)
@@ -121,6 +123,9 @@ CORBA_ORB::~CORBA_ORB (void)
CORBA::release (this->event_service_);
if (!CORBA::is_nil (this->trading_service_))
CORBA::release (this->trading_service_);
+
+ if (this->cond_become_leader_ != 0)
+ this->cond_become_leader_;
}
// Set up listening endpoints.
@@ -229,6 +234,34 @@ CORBA_ORB::run (ACE_Time_Value *tv)
{
ACE_FUNCTION_TIMEPROBE (TAO_CORBA_ORB_RUN_START);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "CORBA_ORB::run: (%d) Trying to become the leader.\n",
+ ACE_Thread::self ()));
+
+ {
+ //ACE_Guard<ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ());
+ TAO_ORB_Core_instance ()->leader_follower_lock ().acquire();
+
+ while (TAO_ORB_Core_instance ()->leader_available ())
+ {
+ if (TAO_ORB_Core_instance ()->add_follower (this->cond_become_leader_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%P|%t) ORB::run: Failed to add a follower thread\n"));
+ this->cond_become_leader_->wait ();
+ }
+ TAO_ORB_Core_instance ()->set_leader_thread ();
+ TAO_ORB_Core_instance ()->leader_follower_lock ().release ();
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) CORBA_ORB::run: is the leader.\n"));
+
+ if (this->shutdown_lock_ == 0)
+ this->shutdown_lock_ =
+ TAO_ORB_Core_instance ()->server_factory ()->create_event_loop_lock ();
+
+
if (this->shutdown_lock_ == 0)
this->shutdown_lock_ =
TAO_ORB_Core_instance ()->server_factory ()->create_event_loop_lock ();
@@ -255,43 +288,60 @@ CORBA_ORB::run (ACE_Time_Value *tv)
// while blocked on I/O.
ACE_GUARD_RETURN (ACE_Lock, monitor, *this->shutdown_lock_, -1);
+ int result = 1;
+ // 1 to detect that nothing went wrong
+
// Loop "forever" handling client requests.
while (this->should_shutdown_ == 0)
- {
+ {
if (monitor.release () == -1)
return -1;
#if 0
counter++;
if (counter == max_iterations)
- {
+ {
ACE_TIMEPROBE_PRINT;
ACE_TIMEPROBE_RESET;
counter = 0;
- }
+ }
ACE_FUNCTION_TIMEPROBE (TAO_CORBA_ORB_RUN_START);
#endif /* 0 */
switch (r->handle_events (tv))
- {
+ {
case 0: // Timed out, so we return to caller.
- return 0;
+ result = 0;
+ break;
/* NOTREACHED */
case -1: // Something else has gone wrong, so return to caller.
- return -1;
+ result = -1;
+ break;
/* NOTREACHED */
default: // Some handlers were dispatched, so keep on processing
// requests until we're told to shutdown .
break;
/* NOTREACHED */
- }
+ }
+ if (result == 0 || result == -1)
+ break;
if (monitor.acquire () == -1)
return -1;
- }
+ }
- return 0;
+ if (result != -1)
+ {
+ if (TAO_ORB_Core_instance ()->unset_leader_wake_up_follower () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) ORB::run: Failed to add a follower thread\n"),
+ -1);
+ return 0;
+ // nothing went wrong
+ }
+ else
+ return result;
}
int
diff --git a/TAO/tao/ORB.h b/TAO/tao/ORB.h
index 659d0f4fd75..c042e9a653f 100644
--- a/TAO/tao/ORB.h
+++ b/TAO/tao/ORB.h
@@ -963,6 +963,9 @@ private:
// Count of the number of times that <ORB_init> has been called.
// This must be protected by <ACE_Static_Object_Lock>.
+ ACE_SYNCH_CONDITION* cond_become_leader_;
+ // wait to become the leader if the leader-follower model is active
+
// = NON-PROVIDED METHODS
CORBA_ORB (const CORBA_ORB &);
CORBA_ORB &operator= (const CORBA_ORB &);
diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp
index 4a94b7d7a1a..d0d5d23ba0a 100644
--- a/TAO/tao/ORB_Core.cpp
+++ b/TAO/tao/ORB_Core.cpp
@@ -49,9 +49,17 @@ TAO_ORB_Core::TAO_ORB_Core (void)
server_factory_from_service_config_ (CORBA::B_FALSE),
opt_for_collocation_ (CORBA::B_TRUE),
preconnections_ (0)
-{
+{
}
+ACE_SYNCH_MUTEX TAO_ORB_Core::leader_follower_lock_;
+
+ACE_Unbounded_Set<ACE_SYNCH_CONDITION*> TAO_ORB_Core::follower_set_;
+
+int TAO_ORB_Core::leaders_ = 0;
+
+ACE_thread_t TAO_ORB_Core::leader_thread_ID_ = 0;
+
TAO_ORB_Core::~TAO_ORB_Core (void)
{
// This should probably be changed to use the allocator internal to
@@ -462,6 +470,13 @@ TAO_ORB_Core::init (int& argc, char** argv)
this_orb->_use_omg_ior_format (use_ior);
this_orb->_optimize_collocation_objects (this->opt_for_collocation_);
+ // @@ Michael: I don't know if this is the best spot,
+ // we might have to discuss that.
+ //this->leader_follower_lock_ptr_ = this->client_factory ()
+ // ->create_leader_follower_lock ();
+
+
+
// Set all kinds of orb parameters whose setting needed to be
// deferred until after the service config entries had been
// determined.
@@ -905,6 +920,129 @@ TAO_ORB_Core::get_collocated_poa (ACE_INET_Addr &addr)
}
+int
+TAO_ORB_Core::leader_available (void)
+// returns the value of the flag indicating if a leader
+// is available in the leader-follower model
+{
+ return this->leaders_;
+}
+
+int
+TAO_ORB_Core::I_am_the_leader_thread (void)
+// returns 1 if we are the leader thread,
+// else 0
+{
+ if (this->leaders_)
+ return (this->leader_thread_ID_ == ACE_Thread::self ());
+ else
+ return 0;
+}
+
+void
+TAO_ORB_Core::set_leader_thread (void)
+// sets the thread ID of the leader thread in the leader-follower
+// model
+{
+ ACE_ASSERT ((this->leaders_ >= 1 && this->leader_thread_ID_ == ACE_Thread::self ())
+ || this->leaders_ == 0);
+ this->leaders_++;
+ this->leader_thread_ID_ = ACE_Thread::self ();
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) TAO_ORB_Core: New leader: leaders = %d\n",
+ this->leaders_));
+}
+
+int
+TAO_ORB_Core::unset_leader_wake_up_follower (void)
+// sets the leader_available flag to false and tries to wake up a follower
+{
+ ACE_Guard <ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) TAO_ORB_Core: unset leader\n"));
+ this->unset_leader_thread ();
+
+ if (TAO_ORB_Core_instance ()->follower_available ()
+ && !this->leader_available ())
+ // do it only if a follower is available and no leader is available
+ {
+ ACE_SYNCH_CONDITION* condition_ptr = this->get_next_follower ();
+ if (this->remove_follower (condition_ptr) == -1)
+ return -1;
+ condition_ptr->signal ();
+ }
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) TAO_ORB_Core: no new leader: "
+ "follower = %d leaders = %d leader_ID = %d\n",
+ TAO_ORB_Core_instance ()->follower_available (),
+ this->leaders_,
+ this->leader_thread_ID_));
+ return 0;
+}
+
+
+void
+TAO_ORB_Core::unset_leader_thread (void)
+// sets the flag in the leader-follower model to false
+{
+ ACE_ASSERT ((this->leaders_ > 1 && this->leader_thread_ID_ == ACE_Thread::self ())
+ || this->leaders_ == 1);
+ this->leaders_--;
+}
+
+
+ACE_SYNCH_MUTEX &
+TAO_ORB_Core::leader_follower_lock (void)
+// returns the leader-follower lock
+{
+ return this->leader_follower_lock_;
+}
+
+int
+TAO_ORB_Core::add_follower (ACE_SYNCH_CONDITION *follower_ptr)
+// adds the a follower to the set of followers in the leader-
+// follower model
+// returns 0 on success, -1 on failure
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) TAO_ORB_Core::add_follower: add \n"));
+
+ if (this->follower_set_.insert (follower_ptr) != 0)
+ return -1;
+ return 0;
+}
+
+int
+TAO_ORB_Core::follower_available (void)
+// checks for the availablity of a follower
+// returns 1 on available, 0 else
+{
+ return !this->follower_set_.is_empty ();
+}
+
+int
+TAO_ORB_Core::remove_follower (ACE_SYNCH_CONDITION *follower_ptr)
+// removes a follower from the leader-follower set
+// returns 0 on success, -1 on failure
+{
+ return this->follower_set_.remove (follower_ptr);
+}
+
+ACE_SYNCH_CONDITION*
+TAO_ORB_Core::get_next_follower (void)
+// returns randomly a follower from the leader-follower set
+// returns follower on success, else 0
+{
+ ACE_Unbounded_Set_Iterator<ACE_SYNCH_CONDITION *> iterator (this->follower_set_);
+ if (iterator.first () == 0)
+ // means set is empty
+ return 0;
+ return *iterator;
+}
+
+
TAO_Resource_Factory::TAO_Resource_Factory (void)
: resource_source_ (TAO_GLOBAL),
poa_source_ (TAO_GLOBAL),
diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h
index 3eaced2231c..6ed9e011133 100644
--- a/TAO/tao/ORB_Core.h
+++ b/TAO/tao/ORB_Core.h
@@ -155,6 +155,47 @@ public:
// See if we have a collocated address, if yes, return the POA
// associated with the address.
+ int leader_available (void);
+ // returns the refcount on the leader
+
+ int I_am_the_leader_thread (void);
+ // returns 1 if we are the leader thread,
+ // else 0
+
+ void set_leader_thread (void) ;
+ // sets the thread_available flag and the thread ID of the leader
+ // thread in the leader-follower model
+
+ void set_leader_thread (ACE_thread_t thread_ID);
+ // sets the thread ID of the leader thread in the leader-follower
+ // model
+
+ void unset_leader_thread (void);
+ // sets the leader_available flag to false
+
+ int unset_leader_wake_up_follower (void);
+ // sets the leader_available flag to false
+ // and wakes up a new follower
+
+ ACE_SYNCH_MUTEX &leader_follower_lock (void);
+ // returns the leader-follower lock
+
+ int add_follower (ACE_SYNCH_CONDITION *follower_ptr);
+ // adds the a follower to the set of followers in the leader-
+ // follower model
+ // returns 0 on success, -1 on failure
+
+ int follower_available ();
+ // checks for the availablity of a follower
+ // returns 1 on available, 0 else
+
+ int remove_follower (ACE_SYNCH_CONDITION *follower_ptr);
+ // removes a follower from the leader-follower set
+ // returns 0 on success, -1 on failure
+
+ ACE_SYNCH_CONDITION *get_next_follower (void);
+ // returns randomly a follower from the leader-follower set
+ // returns follower on success, else 0
private:
int init (int& argc, char ** argv);
// Initialize the guts of the ORB Core. It is intended that this be
@@ -259,6 +300,20 @@ private:
char *preconnections_;
// A string of comma-separated <{host}>:<{port}> pairs used to
// pre-establish connections using <preconnect>.
+
+ static ACE_SYNCH_MUTEX leader_follower_lock_;
+ // do protect the access to the following three members
+
+ static ACE_Unbounded_Set<ACE_SYNCH_CONDITION *> follower_set_;
+ // keep a set of followers around (protected)
+
+ static int leaders_;
+ // 0 if no leader is around, 1 if there is a leader
+ // > 1 if we do nested upcalls (protected)
+
+ static ACE_thread_t leader_thread_ID_;
+ // thread ID of the leader thread (protected)
+
};
class TAO_Default_Reactor : public ACE_Reactor