diff options
author | mk1 <mk1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-12 21:19:27 +0000 |
---|---|---|
committer | mk1 <mk1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-12 21:19:27 +0000 |
commit | d8877a0192a37586185bdd3cfe7609b62ddedd61 (patch) | |
tree | 5d0fb41810567f9915823e08b11b6561f7bff879 | |
parent | 9d981ec55ff95442c8daaf643bf7fff2ee3ca3d6 (diff) | |
download | ATCD-d8877a0192a37586185bdd3cfe7609b62ddedd61.tar.gz |
New Branch for the multithreaded client ORB
-rw-r--r-- | TAO/tao/Connect.cpp | 306 | ||||
-rw-r--r-- | TAO/tao/GIOP.cpp | 15 | ||||
-rw-r--r-- | TAO/tao/ORB.cpp | 68 | ||||
-rw-r--r-- | TAO/tao/ORB.h | 3 | ||||
-rw-r--r-- | TAO/tao/ORB_Core.cpp | 140 | ||||
-rw-r--r-- | TAO/tao/ORB_Core.h | 55 |
6 files changed, 513 insertions, 74 deletions
diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp index c370caf1fb7..2cddc3c285a 100644 --- a/TAO/tao/Connect.cpp +++ b/TAO/tao/Connect.cpp @@ -402,6 +402,8 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) ACE_FUNCTION_TIMEPROBE (TAO_SERVER_CONNECTION_HANDLER_HANDLE_INPUT_START); + + // @@ TODO This should take its memory from a specialized // allocator. It is better to use a message block than a on stack // buffer because we cannot minimize memory copies in that case. @@ -490,8 +492,16 @@ TAO_Server_Connection_Handler::handle_input (ACE_HANDLE) TAO_Client_Connection_Handler::TAO_Client_Connection_Handler (ACE_Thread_Manager *t) : TAO_SVC_HANDLER (t, 0, 0), - input_available_ (0) + input_available_ (0), + calling_thread_ (0) +{ + this->cond_response_available_ = + new ACE_SYNCH_CONDITION (TAO_ORB_Core_instance ()->leader_follower_lock ()); +} + +TAO_Client_Connection_Handler::~TAO_Client_Connection_Handler () { + delete this->cond_response_available_; } int @@ -576,29 +586,118 @@ TAO_Client_Connection_Handler::send_request (TAO_OutputCDR &stream, return -1; if (is_twoway) + { + // remember in which thread the client connection handler was running + this->calling_thread_ = ACE_Thread::self (); + if (TAO_ORB_Core_instance ()->leader_follower_lock ().acquire() == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to get the lock.\n"), + -1); + + ACE_DEBUG ((LM_DEBUG, + "Client_Connection_Handler::send_request: (%d) starting\n", + ACE_Thread::self ())); + + // check if there is a leader, but the leader is not us + if (TAO_ORB_Core_instance ()->leader_available () + && !TAO_ORB_Core_instance ()->I_am_the_leader_thread ()) { - // Go into a loop, waiting until it's safe to try to read - // something on the soket. The handle_input() method doesn't - // actualy do the read, though, proper behavior based on what is - // read may be different if we're not using GIOP above here. - // So, we leave the reading of the response to the caller of - // this method, and simply insure that this method doesn't - // return until such time as doing a recv() on the socket would - // actually produce fruit. - ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor (); - - int ret = 0; - - while (ret != -1 && ! this->input_available_) - ret = r->handle_events (); - - this->input_available_ = 0; - // We can get events now, b/c we want them! - r->resume_handler (this); - // We're no longer expecting a response! - this->expecting_response_ = 0; + // wait as long as no input is available and/or + // no leader is available + while (!this->input_available_ + && TAO_ORB_Core_instance ()->leader_available ()) + { + if (TAO_ORB_Core_instance ()->add_follower (this->cond_response_available_) == -1) + ACE_ERROR ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to add a follower thread\n")); + this->cond_response_available_->wait (); + } + // now somebody woke us up to become a leader or + // to handle our input. We are already removed from the + // follower queue + if (this->input_available_) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Client_Connection_Handler::send_request: fake handle_input\n", + ACE_Thread::self ())); + + + + // there is input waiting for me + TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + + int ret = 0; //this->handle_input (); // fake the handle_input + if (ret < 0) + { + ACE_DEBUG ((LM_DEBUG, + "Client_Connection_Handler::send_request: (%d) " + "failure faking handle_input\n", + ACE_Thread::self ())); + TAO_ORB_Core_instance ()->reactor ()->remove_handler (this, + ACE_Event_Handler::ALL_EVENTS_MASK); + // failure handling + return -1; + } + /* else if (ret > 0) + // we have to reschedule, not implemented yet + */ + + // the following variables are safe, because we are not registered with + // the reactor any more. + this->input_available_ = 0; + this->expecting_response_ = 0; + this->calling_thread_ = 0; + return 0; + } } + // become a leader, because there is no leader or we have to update to a leader + // or we are doing nested upcalls in this case we do increase the refcount + ACE_DEBUG ((LM_DEBUG, + "Client_Connection_Handler::send_request: (%d) become a leader\n", + ACE_Thread::self ())); + + TAO_ORB_Core_instance ()->set_leader_thread (); + // this might increase the recount of the leader + + if (TAO_ORB_Core_instance ()->leader_follower_lock ().release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to release the lock.\n"), + -1); + + ACE_Reactor *r = TAO_ORB_Core_instance ()->reactor (); + r->owner (ACE_Thread::self ()); + + int ret = 0; + + while (ret != -1 && !this->input_available_) + ret = r->handle_events (); + + if (ret == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "handle_events failed.\n"), + -1); + + + // wake up the next leader, we cannot do that in handle_input, + // because the woken up thread would try to get into handle_events, + // which is at the time in handle_input still occupied. + + if (TAO_ORB_Core_instance ()->unset_leader_wake_up_follower () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) TAO_Client_Connection_Handler::send_request: " + "Failed to unset the leader and wake up a new follower.\n"), + -1); + + this->input_available_ = 0; + this->expecting_response_ = 0; + this->calling_thread_ = 0; + } + return 0; } @@ -607,54 +706,133 @@ TAO_Client_Connection_Handler::handle_input (ACE_HANDLE) { int retval = 0; - if (this->expecting_response_) + + TAO_ORB_Core_instance ()->leader_follower_lock ().acquire (); + + if (!this->expecting_response_) + { + // we got something, but did not want + // @@ wake up an other thread, we are lost + + // We're a client, so we're not expecting to see input. Still + // we better check what it is! + char ignored; + ssize_t ret; + ACE_Time_Value tv = ACE_Time_Value::zero; + retval = 0; + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d) " + "not expected response\n", + this->calling_thread_)); + retval = -1; + if (this->calling_thread_ == 0) { - this->input_available_ = 1; - // Temporarily remove ourself from notification so that if - // another sub event loop is in effect still waiting for its - // response, it doesn't spin tightly gobbling up CPU. - TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); + ret = this->peer().recv (&ignored, sizeof ignored, MSG_PEEK, &tv); + retval = 0; + // if -1 is returned, the nested upcalls server crashes, + // if the tv value is not specified we will hang in a blocking read } - else + else if (ret = this->peer().recv (&ignored, sizeof ignored, MSG_PEEK) > 0) { - // We're a client, so we're not expecting to see input. Still - // we better check what it is! - char ignored; - ssize_t ret = this->peer().recv (&ignored, sizeof ignored, MSG_PEEK); - - // We're not expecting input at this time, so we'll always - // return -1 for now. - retval = -1; - switch (ret) - { - case -1: - // Error...but we weren't expecting input, either...what - // should we do? - ACE_ERROR ((LM_WARNING, - "Client_Connection_Handler::handle_input received " - "error while reading unexpected input; closing connection on fd %d\n", - this->peer().get_handle ())); - break; - - case 1: - // We weren't expecting input, so what should we do with it? - // Log an error, and close the connection. - ACE_ERROR ((LM_WARNING, - "Client_Connection_Handler::handle_input received " - "input while not expecting a response; closing connection on fd %d\n", - this->peer().get_handle ())); - break; - - case 0: - // This is an EOF, so we will return -1 and let - // handle_close() take over. As long as handle_close() - // calls the Svc_Handler<>::handle_close(), the socket will - // be shutdown properly. - break; - } + ret = this->peer().recv_n (&ignored, sizeof ignored); } - return retval; + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d) " + "ret = %d\n", + this->calling_thread_, + ret)); + + // We're not expecting input at this time, so we'll always + // return -1 for now. + // -1 and rec_n with tv = 0 worked + switch (ret) + { + case -1: + // Error...but we weren't expecting input, either...what + // should we do? + ACE_ERROR ((LM_WARNING, + "Client_Connection_Handler::handle_input: closing connection on fd %d\n", + this->peer().get_handle ())); + break; + + case 1: + // We weren't expecting input, so what should we do with it? + // Log an error, and close the connection. + ACE_ERROR ((LM_WARNING, + "Client_Connection_Handler::handle_input received " + "input while not expecting a response; closing connection on fd %d\n", + this->peer().get_handle ())); + break; + + case 0: + // This is an EOF, so we will return -1 and let + // handle_close() take over. As long as handle_close() + // calls the Svc_Handler<>::handle_close(), the socket will + // be shutdown properly. + break; + } + + TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + return retval; + } + + if (this->calling_thread_ == ACE_Thread::self ()) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d): " + "right thread\n", + this->calling_thread_)); + // we are now a leader getting its response + // or a follower faking the handle_input + + this->input_available_ = 1; + + TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + + TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); + + return 0; + } + else + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Client_Connection_Handler::handle_input: Handler (%d): " + "wrong thread\n", + this->calling_thread_)); + // we are a leader, which got a response for one of the followers, + // which means we are now a thread running the wrong Client_Connection_Handler + + // Close connection + if (this->calling_thread_ == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Client_Connection_Handler::handle_input: calling thread is null: " + "wrong thread\n", + this->calling_thread_)); + + //TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + //return -1; + } + + TAO_ORB_Core_instance ()->remove_follower (this->cond_response_available_); + + TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + + TAO_ORB_Core_instance ()->reactor ()->suspend_handler (this); + + TAO_ORB_Core_instance ()->leader_follower_lock ().acquire (); + + // the thread was already selected to become a leader, + // so we will be called again. + this->input_available_ = 1; + this->cond_response_available_->signal (); + + TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + + + return 0; + } } int diff --git a/TAO/tao/GIOP.cpp b/TAO/tao/GIOP.cpp index 5faecca7023..4236a7c2151 100644 --- a/TAO/tao/GIOP.cpp +++ b/TAO/tao/GIOP.cpp @@ -905,6 +905,13 @@ TAO_GIOP_Invocation::invoke (CORBA::ExceptionList &exceptions, TAO_SVC_HANDLER *handler = this->handler_; TAO_GIOP::Message_Type m = TAO_GIOP::recv_request (handler, this->inp_stream_); + { + //ACE_Guard<ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ()); + TAO_ORB_Core_instance ()->reactor ()->resume_handler (this->handler_); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) GIOP: resume.\n")); + } + switch (m) { case TAO_GIOP::Reply: @@ -1279,6 +1286,14 @@ TAO_GIOP_Invocation::invoke (TAO_Exception_Data *excepts, TAO_SVC_HANDLER *handler = this->handler_; TAO_GIOP::Message_Type m = TAO_GIOP::recv_request (handler, this->inp_stream_); + + { + //ACE_Guard<ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ()); + TAO_ORB_Core_instance ()->reactor ()->resume_handler (this->handler_); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) GIOP: resume.\n")); + } + switch (m) { case TAO_GIOP::Reply: diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp index c62216f6b74..416b39f276a 100644 --- a/TAO/tao/ORB.cpp +++ b/TAO/tao/ORB.cpp @@ -88,6 +88,8 @@ CORBA_ORB::CORBA_ORB (void) event_service_ (CORBA_Object::_nil ()), trading_service_ (CORBA_Object::_nil ()) { + this->cond_become_leader_ = + new ACE_SYNCH_CONDITION (TAO_ORB_Core_instance ()->leader_follower_lock ()); } CORBA_ORB::~CORBA_ORB (void) @@ -121,6 +123,9 @@ CORBA_ORB::~CORBA_ORB (void) CORBA::release (this->event_service_); if (!CORBA::is_nil (this->trading_service_)) CORBA::release (this->trading_service_); + + if (this->cond_become_leader_ != 0) + this->cond_become_leader_; } // Set up listening endpoints. @@ -229,6 +234,34 @@ CORBA_ORB::run (ACE_Time_Value *tv) { ACE_FUNCTION_TIMEPROBE (TAO_CORBA_ORB_RUN_START); + + ACE_DEBUG ((LM_DEBUG, + "CORBA_ORB::run: (%d) Trying to become the leader.\n", + ACE_Thread::self ())); + + { + //ACE_Guard<ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ()); + TAO_ORB_Core_instance ()->leader_follower_lock ().acquire(); + + while (TAO_ORB_Core_instance ()->leader_available ()) + { + if (TAO_ORB_Core_instance ()->add_follower (this->cond_become_leader_) == -1) + ACE_ERROR ((LM_ERROR, + "(%P|%t) ORB::run: Failed to add a follower thread\n")); + this->cond_become_leader_->wait (); + } + TAO_ORB_Core_instance ()->set_leader_thread (); + TAO_ORB_Core_instance ()->leader_follower_lock ().release (); + } + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) CORBA_ORB::run: is the leader.\n")); + + if (this->shutdown_lock_ == 0) + this->shutdown_lock_ = + TAO_ORB_Core_instance ()->server_factory ()->create_event_loop_lock (); + + if (this->shutdown_lock_ == 0) this->shutdown_lock_ = TAO_ORB_Core_instance ()->server_factory ()->create_event_loop_lock (); @@ -255,43 +288,60 @@ CORBA_ORB::run (ACE_Time_Value *tv) // while blocked on I/O. ACE_GUARD_RETURN (ACE_Lock, monitor, *this->shutdown_lock_, -1); + int result = 1; + // 1 to detect that nothing went wrong + // Loop "forever" handling client requests. while (this->should_shutdown_ == 0) - { + { if (monitor.release () == -1) return -1; #if 0 counter++; if (counter == max_iterations) - { + { ACE_TIMEPROBE_PRINT; ACE_TIMEPROBE_RESET; counter = 0; - } + } ACE_FUNCTION_TIMEPROBE (TAO_CORBA_ORB_RUN_START); #endif /* 0 */ switch (r->handle_events (tv)) - { + { case 0: // Timed out, so we return to caller. - return 0; + result = 0; + break; /* NOTREACHED */ case -1: // Something else has gone wrong, so return to caller. - return -1; + result = -1; + break; /* NOTREACHED */ default: // Some handlers were dispatched, so keep on processing // requests until we're told to shutdown . break; /* NOTREACHED */ - } + } + if (result == 0 || result == -1) + break; if (monitor.acquire () == -1) return -1; - } + } - return 0; + if (result != -1) + { + if (TAO_ORB_Core_instance ()->unset_leader_wake_up_follower () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) ORB::run: Failed to add a follower thread\n"), + -1); + return 0; + // nothing went wrong + } + else + return result; } int diff --git a/TAO/tao/ORB.h b/TAO/tao/ORB.h index 659d0f4fd75..c042e9a653f 100644 --- a/TAO/tao/ORB.h +++ b/TAO/tao/ORB.h @@ -963,6 +963,9 @@ private: // Count of the number of times that <ORB_init> has been called. // This must be protected by <ACE_Static_Object_Lock>. + ACE_SYNCH_CONDITION* cond_become_leader_; + // wait to become the leader if the leader-follower model is active + // = NON-PROVIDED METHODS CORBA_ORB (const CORBA_ORB &); CORBA_ORB &operator= (const CORBA_ORB &); diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 4a94b7d7a1a..d0d5d23ba0a 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -49,9 +49,17 @@ TAO_ORB_Core::TAO_ORB_Core (void) server_factory_from_service_config_ (CORBA::B_FALSE), opt_for_collocation_ (CORBA::B_TRUE), preconnections_ (0) -{ +{ } +ACE_SYNCH_MUTEX TAO_ORB_Core::leader_follower_lock_; + +ACE_Unbounded_Set<ACE_SYNCH_CONDITION*> TAO_ORB_Core::follower_set_; + +int TAO_ORB_Core::leaders_ = 0; + +ACE_thread_t TAO_ORB_Core::leader_thread_ID_ = 0; + TAO_ORB_Core::~TAO_ORB_Core (void) { // This should probably be changed to use the allocator internal to @@ -462,6 +470,13 @@ TAO_ORB_Core::init (int& argc, char** argv) this_orb->_use_omg_ior_format (use_ior); this_orb->_optimize_collocation_objects (this->opt_for_collocation_); + // @@ Michael: I don't know if this is the best spot, + // we might have to discuss that. + //this->leader_follower_lock_ptr_ = this->client_factory () + // ->create_leader_follower_lock (); + + + // Set all kinds of orb parameters whose setting needed to be // deferred until after the service config entries had been // determined. @@ -905,6 +920,129 @@ TAO_ORB_Core::get_collocated_poa (ACE_INET_Addr &addr) } +int +TAO_ORB_Core::leader_available (void) +// returns the value of the flag indicating if a leader +// is available in the leader-follower model +{ + return this->leaders_; +} + +int +TAO_ORB_Core::I_am_the_leader_thread (void) +// returns 1 if we are the leader thread, +// else 0 +{ + if (this->leaders_) + return (this->leader_thread_ID_ == ACE_Thread::self ()); + else + return 0; +} + +void +TAO_ORB_Core::set_leader_thread (void) +// sets the thread ID of the leader thread in the leader-follower +// model +{ + ACE_ASSERT ((this->leaders_ >= 1 && this->leader_thread_ID_ == ACE_Thread::self ()) + || this->leaders_ == 0); + this->leaders_++; + this->leader_thread_ID_ = ACE_Thread::self (); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) TAO_ORB_Core: New leader: leaders = %d\n", + this->leaders_)); +} + +int +TAO_ORB_Core::unset_leader_wake_up_follower (void) +// sets the leader_available flag to false and tries to wake up a follower +{ + ACE_Guard <ACE_SYNCH_MUTEX> g (TAO_ORB_Core_instance ()->leader_follower_lock ()); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) TAO_ORB_Core: unset leader\n")); + this->unset_leader_thread (); + + if (TAO_ORB_Core_instance ()->follower_available () + && !this->leader_available ()) + // do it only if a follower is available and no leader is available + { + ACE_SYNCH_CONDITION* condition_ptr = this->get_next_follower (); + if (this->remove_follower (condition_ptr) == -1) + return -1; + condition_ptr->signal (); + } + else + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) TAO_ORB_Core: no new leader: " + "follower = %d leaders = %d leader_ID = %d\n", + TAO_ORB_Core_instance ()->follower_available (), + this->leaders_, + this->leader_thread_ID_)); + return 0; +} + + +void +TAO_ORB_Core::unset_leader_thread (void) +// sets the flag in the leader-follower model to false +{ + ACE_ASSERT ((this->leaders_ > 1 && this->leader_thread_ID_ == ACE_Thread::self ()) + || this->leaders_ == 1); + this->leaders_--; +} + + +ACE_SYNCH_MUTEX & +TAO_ORB_Core::leader_follower_lock (void) +// returns the leader-follower lock +{ + return this->leader_follower_lock_; +} + +int +TAO_ORB_Core::add_follower (ACE_SYNCH_CONDITION *follower_ptr) +// adds the a follower to the set of followers in the leader- +// follower model +// returns 0 on success, -1 on failure +{ + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) TAO_ORB_Core::add_follower: add \n")); + + if (this->follower_set_.insert (follower_ptr) != 0) + return -1; + return 0; +} + +int +TAO_ORB_Core::follower_available (void) +// checks for the availablity of a follower +// returns 1 on available, 0 else +{ + return !this->follower_set_.is_empty (); +} + +int +TAO_ORB_Core::remove_follower (ACE_SYNCH_CONDITION *follower_ptr) +// removes a follower from the leader-follower set +// returns 0 on success, -1 on failure +{ + return this->follower_set_.remove (follower_ptr); +} + +ACE_SYNCH_CONDITION* +TAO_ORB_Core::get_next_follower (void) +// returns randomly a follower from the leader-follower set +// returns follower on success, else 0 +{ + ACE_Unbounded_Set_Iterator<ACE_SYNCH_CONDITION *> iterator (this->follower_set_); + if (iterator.first () == 0) + // means set is empty + return 0; + return *iterator; +} + + TAO_Resource_Factory::TAO_Resource_Factory (void) : resource_source_ (TAO_GLOBAL), poa_source_ (TAO_GLOBAL), diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index 3eaced2231c..6ed9e011133 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -155,6 +155,47 @@ public: // See if we have a collocated address, if yes, return the POA // associated with the address. + int leader_available (void); + // returns the refcount on the leader + + int I_am_the_leader_thread (void); + // returns 1 if we are the leader thread, + // else 0 + + void set_leader_thread (void) ; + // sets the thread_available flag and the thread ID of the leader + // thread in the leader-follower model + + void set_leader_thread (ACE_thread_t thread_ID); + // sets the thread ID of the leader thread in the leader-follower + // model + + void unset_leader_thread (void); + // sets the leader_available flag to false + + int unset_leader_wake_up_follower (void); + // sets the leader_available flag to false + // and wakes up a new follower + + ACE_SYNCH_MUTEX &leader_follower_lock (void); + // returns the leader-follower lock + + int add_follower (ACE_SYNCH_CONDITION *follower_ptr); + // adds the a follower to the set of followers in the leader- + // follower model + // returns 0 on success, -1 on failure + + int follower_available (); + // checks for the availablity of a follower + // returns 1 on available, 0 else + + int remove_follower (ACE_SYNCH_CONDITION *follower_ptr); + // removes a follower from the leader-follower set + // returns 0 on success, -1 on failure + + ACE_SYNCH_CONDITION *get_next_follower (void); + // returns randomly a follower from the leader-follower set + // returns follower on success, else 0 private: int init (int& argc, char ** argv); // Initialize the guts of the ORB Core. It is intended that this be @@ -259,6 +300,20 @@ private: char *preconnections_; // A string of comma-separated <{host}>:<{port}> pairs used to // pre-establish connections using <preconnect>. + + static ACE_SYNCH_MUTEX leader_follower_lock_; + // do protect the access to the following three members + + static ACE_Unbounded_Set<ACE_SYNCH_CONDITION *> follower_set_; + // keep a set of followers around (protected) + + static int leaders_; + // 0 if no leader is around, 1 if there is a leader + // > 1 if we do nested upcalls (protected) + + static ACE_thread_t leader_thread_ID_; + // thread ID of the leader thread (protected) + }; class TAO_Default_Reactor : public ACE_Reactor |