diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-27 00:14:10 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-27 00:14:10 +0000 |
commit | b9670d1f13fbbfa54e42418fd6460eb1b05198ef (patch) | |
tree | 96c21f9423622d765d61f20285e2d640043639fd | |
parent | dcfd08cc4932d375f5023563455aebd18b6721ed (diff) | |
download | ATCD-b9670d1f13fbbfa54e42418fd6460eb1b05198ef.tar.gz |
ChangeLogTag:Thu Jul 26 16:50:46 2001 Carlos O'Ryan <coryan@uci.edu>
72 files changed, 1889 insertions, 986 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index d0e353764a3..d650871d008 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,17 +1,184 @@ +Thu Jul 26 16:50:46 2001 Carlos O'Ryan <coryan@uci.edu> + + * Part of the fixes for + http://ace.cs.wustl.edu/bugzilla/show_bug.cgi?id=886 + + the changes also close the following bug: + + http://ace.cs.wustl.edu/bugzilla/show_bug.cgi?id=296 + + * tao/Leader_Follower_Flushing_Strategy.h: + * tao/Leader_Follower_Flushing_Strategy.cpp: + New flushing strategy that participates in the Leader/Followers + protocol. + To support this several changes to the Leader/Followers + implementation were required. The most important involved using + some abstract representation for the events that the + Leader/Followers wait for, in the old days there were only reply + events, so there was no need to abstract anything, but now the + Leader/Followers set can wait for both 'message flushed' events, + as well as 'reply received'. + With this explicit representation for events at hand it was + easier to encapsulate the Leader/Followers wait loop in + TAO_Leader_Follower class, instead of hidden in + Wait_On_Leader_Follower. + To match the events that L/F waits for and the threads waiting + for them we addd a class that represents a Follower thread. + These TAO_Follower objects had to implement an intrusive list + for fast addition into the follower set, once that intrusive + list was implemented adding a free list was trivial, and thus we + could solve bug 296 easily too. + + * tao/Asynch_Queued_Message.cpp: + * tao/Synch_Queued_Message.cpp: + Use the TAO_LF_Event methods to signal any waiters when the + state changes. + + * tao/Follower.h: + * tao/Follower.inl: + * tao/Follower.cpp: + This class represents a thread playing the Follower role. It + contains the condition variable used by the thread. + The class provides the necessary hooks to implement an intrusive + linked list. + + * tao/Invocation.cpp: + The waiting strategy wants the complete Synch_Reply_Dispatcher, + not just the reply_received flag. + + * tao/LF_Event.h: + * tao/LF_Event.inl: + * tao/LF_Event.cpp: + New class to represent events that the Leader/Followers loop + waits for. Used as a base class for both TAO_Queued_Message and + for TAO_Synch_Reply. + + * tao/LF_Event_Loop_Thread_Helper.h: + * tao/LF_Event_Loop_Thread_Helper.inl: + * tao/LF_Event_Loop_Thread_Helper.cpp: + Move helper class to its own file, no sense in exposing it to + everybody through the Leader_Follower.h file. + + * tao/Leader_Follower.h: + * tao/Leader_Follower.i: + * tao/Leader_Follower.cpp: + Add free list for TAO_Follower, as well as allocation and + deallocation methods. + Move Leader/Followers main loop to this class. + Move LF_Strategy and friends to their own files. + + * tao/ORB_Core.h: + * tao/ORB_Core.i: + * tao/ORB_Core.cpp: + Removed the TSS Leader/Followers condition variable, the + Leader/Followers free list implements the same optimization with + less problems (i.e. without bug 296). + + * tao/Queued_Message.h: + * tao/Queued_Message.cpp: + * tao/Synch_Reply_Dispatcher.h: + * tao/Synch_Reply_Dispatcher.cpp: + This class derives from TAO_LF_Event now. Any state or methods + required to detect timeouts, closed connections or transmition + errors are in the base class. + + * tao/Reply_Dispatcher.h: + * tao/Asynch_Reply_Dispatcher.h: + * tao/Asynch_Reply_Dispatcher.cpp: + Remove the dispatcher_bound() calls, they are no longer required + to match follower threads and their reply dispatchers, this is + now done in the TAO_LF_Event::bind() method, called from + TAO_Leader_Follower::wait_for_event() + + * tao/Transport.h: + * tao/Transport.cpp: + + * tao/Transport_Mux_Strategy.h: + * tao/Transport_Mux_Strategy.cpp: + * tao/Muxed_TMS.cpp: + * tao/Exclusive_TMS.cpp: + Since there is no need to call dispatcher_bound() anymore the + bind_dispatcher() methods were simplified. + + * tao/Wait_On_Leader_Follower.h: + * tao/Wait_On_Leader_Follower.cpp: + * tao/Wait_On_Reactor.h: + * tao/Wait_On_Reactor.cpp: + * tao/Wait_On_Read.h: + * tao/Wait_On_Read.cpp: + * tao/Wait_Strategy.h: + * tao/Wait_Strategy.cpp: + Use a TAO_Synch_Reply_Dispatcher to wait for a reply. The hack + using a reply_received flag + a cond.var. was too ugly, plus it + was tightly coupling the Leader/Followers loop to the reply + dispatching logic. + + * tao/default_resource.h: + * tao/default_resource.cpp: + Made Leader_Follower_Flushing_Strategy the default. + + * tao/orbconf.h: + * tao/default_client.cpp: + Made Muxed_TMS the default + + * tao/LF_Strategy.h: + * tao/LF_Strategy.inl: + * tao/LF_Strategy.cpp: + * tao/LF_Strategy_Complete.h: + * tao/LF_Strategy_Complete.inl: + * tao/LF_Strategy_Complete.cpp: + Move the LF_Strategy classes to their own files, no sense in + exposing them to everybody through the Leader_Follower.h file. + + * tao/Follower_Auto_Ptr.h: + * tao/Follower_Auto_Ptr.inl: + * tao/Follower_Auto_Ptr.cpp: + Helper class to automatically allocate and deallocate + TAO_Follower objects from the Leader/Followers set. + + * tao/GIOP_Message_Base.cpp: + * tao/GIOP_Message_Lite.cpp: + * tao/Reactor_Registry.cpp: + Must #include the "LF_Strategy.h" file explicitly. + + * tao/TAO.dsp: + * tao/TAO_Static.dsp: + * tao/Makefile: + * tao/Makefile.bor: + * tao/Strategies/TAO_Strategies.dsp: + * tao/Strategies/TAO_Strategies_Static.dsp: + * tao/Strategies/Makefile: + * tao/Strategies/Makefile.bor: + Add new files to the projects and Makefile + + + * tao/Strategies/advanced_resource.cpp: + * tao/Strategies/LF_Strategy_Null.h: + * tao/Strategies/LF_Strategy_Null.inl: + * tao/Strategies/LF_Strategy_Null.cpp: + Move the Null Leader/Follower Strategy to the TAO_Strategies + library, it was in TAO, but was only used here. + + * tao/RTPortableServer/TAO_RTPortableServer.dsp: + Fixed missing libraries in link line. + + * tao/TAO.dsw: + Add missing dependencies for RTPortableServer and RTCORBA + Thu Jul 26 09:44:00 2001 Craig Rodrigues <crodrigu@bbn.com> * orbsvcs/orbsvcs/AV/QoS_UDP.cpp: - Hide more debugging messages behind: if( TAO_debug_level > 0) + Hide more debugging messages behind: if( TAO_debug_level > 0) Thu Jul 26 07:37:29 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * tests/RTCORBA/Server_Protocol/server.cpp: - * tests/RTCORBA/Client_Propagated/server.cpp: - * tests/RTCORBA/Thread_Pool/server.cpp: Fixed warnings in g++. + * tests/RTCORBA/Server_Protocol/server.cpp: + * tests/RTCORBA/Client_Propagated/server.cpp: + * tests/RTCORBA/Thread_Pool/server.cpp: Fixed warnings in g++. Wed Jul 25 23:37:00 2001 Craig Rodrigues <crodrigu@bbn.com> - + * orbsvcs/tests/AVStreams/Full_Profile/Makefile: * orbsvcs/tests/AVStreams/Latency/Makefile: * orbsvcs/tests/AVStreams/Modify_QoS/Makefile: @@ -21,77 +188,77 @@ Wed Jul 25 23:37:00 2001 Craig Rodrigues <crodrigu@bbn.com> * orbsvcs/tests/AVStreams/Simple_Three_Stage/Makefile: * orbsvcs/tests/AVStreams/Simple_Two_Stage/Makefile: * orbsvcs/orbsvcs/Makefile.av: - + Correct link flags so that ACE_QoS is linked in when rapi=1 is specified in platform_macros.GNU. Wed Jul 25 22:45:10 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * tao/Strategies/SHMIOP_Transport.cpp: - * tao/Strategies/DIOP_Transport.cpp: Fixed a compile error that - came up from my previous change. + * tao/Strategies/SHMIOP_Transport.cpp: + * tao/Strategies/DIOP_Transport.cpp: Fixed a compile error that + came up from my previous change. Wed Jul 25 22:39:32 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * tests/RTCORBA/Server_Protocol/server.cpp: - * tests/RTCORBA/Client_Propagated/server.cpp: - * tests/RTCORBA/Thread_Pool/server.cpp: Added checks & debugging - statments for a null RootPOA. thanks to Johnny Willemsen for - pointing this out. Did some minor cosmetic fixes. + * tests/RTCORBA/Server_Protocol/server.cpp: + * tests/RTCORBA/Client_Propagated/server.cpp: + * tests/RTCORBA/Thread_Pool/server.cpp: Added checks & debugging + statments for a null RootPOA. thanks to Johnny Willemsen for + pointing this out. Did some minor cosmetic fixes. Wed Jul 25 22:10:21 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * tao/Transport.cpp: - * tao/Transport.h: Fixed a subtle problem that seems to have lead - to the Muxing tests failing randomly. The problem is something - like this - - multiple client threads can try to share a connection - - because of the above, more than one message are sent on the - same connection - - if the server is multi-threaded, the messages can be processed - concurrently - - there may be a possibility of more than two replies coming on - the same connection. - - one of the client threads can pick up both the replies - - one of the replies would be queued up and the first one can be - its own - - after queueing up the second it would wake up another thread - - if the woken up thread does not own the reply, it could just - take the reply and try to transfer ownership to the right - thread. - - before the second thread transfers the reply, teh second - thread would have resumed the handler and because of which one - of the threads would have gone into the reactor from the LF. - - at exactly the same instant the seccond thread will have - difficulty in waking up the thread on select () is it is the - owner. - Fixed this problem by not resuming the handle till we dispatch - the reply. We dont buy anything by resuming the handle before - dispatching the reply because, the dispatching will not be - unbounded. The forces that apply to the server thread, which - resumes the handle before making an upcall does not apply to the - client threads that reads and processes replies. This fix should - ideally fix the Muxing test failure on different paltforms. If - it doesnt, it will atleast prevent the race condition outlined - above :-) + * tao/Transport.cpp: + * tao/Transport.h: Fixed a subtle problem that seems to have lead + to the Muxing tests failing randomly. The problem is something + like this + - multiple client threads can try to share a connection + - because of the above, more than one message are sent on the + same connection + - if the server is multi-threaded, the messages can be processed + concurrently + - there may be a possibility of more than two replies coming on + the same connection. + - one of the client threads can pick up both the replies + - one of the replies would be queued up and the first one can be + its own + - after queueing up the second it would wake up another thread + - if the woken up thread does not own the reply, it could just + take the reply and try to transfer ownership to the right + thread. + - before the second thread transfers the reply, teh second + thread would have resumed the handler and because of which one + of the threads would have gone into the reactor from the LF. + - at exactly the same instant the seccond thread will have + difficulty in waking up the thread on select () is it is the + owner. + Fixed this problem by not resuming the handle till we dispatch + the reply. We dont buy anything by resuming the handle before + dispatching the reply because, the dispatching will not be + unbounded. The forces that apply to the server thread, which + resumes the handle before making an upcall does not apply to the + client threads that reads and processes replies. This fix should + ideally fix the Muxing test failure on different paltforms. If + it doesnt, it will atleast prevent the race condition outlined + above :-) Wed Jul 25 20:33:21 2001 Balachandran Natarajan <bala@cs.wustl.edu> - * examples/Simple/time-date/Makefile.bor: - * examples/Simple/time-date/server.bor: - * examples/Simple/time-date/time_date.bor: Fixed Borland builds - for this example. This commit is for Johnny Willemsen who is - away from his work. + * examples/Simple/time-date/Makefile.bor: + * examples/Simple/time-date/server.bor: + * examples/Simple/time-date/time_date.bor: Fixed Borland builds + for this example. This commit is for Johnny Willemsen who is + away from his work. Wed Jul 25 12:50:00 2001 Michael Kircher <Michael.Kircher@mchp.siemens.de> - * tao/Strategies/DIOP_Factory.cpp: + * tao/Strategies/DIOP_Factory.cpp: - Changed the return value of requires_explicit_endpoint () to 0 - and documented that this return code is not reflecting that - the endpoints are not cleaned-up but that we disable it by default - because DIOP is only suitable for certain use cases, e.g. it only - supports one-ways. + Changed the return value of requires_explicit_endpoint () to 0 + and documented that this return code is not reflecting that + the endpoints are not cleaned-up but that we disable it by default + because DIOP is only suitable for certain use cases, e.g. it only + supports one-ways. Wed Jul 25 08:41:39 2001 Jeff Parsons <parsons@cs.wustl.edu> diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp index 5fdafc28735..e33e3a97ccd 100644 --- a/TAO/tao/Asynch_Queued_Message.cpp +++ b/TAO/tao/Asynch_Queued_Message.cpp @@ -59,6 +59,8 @@ TAO_Asynch_Queued_Message::fill_iov (int iovcnt_max, void TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count) { + this->state_changed_i (TAO_LF_Event::LFS_ACTIVE); + size_t remaining_bytes = this->size_ - this->offset_; if (byte_count > remaining_bytes) { @@ -68,6 +70,9 @@ TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count) } this->offset_ += byte_count; byte_count = 0; + + if (this->all_data_sent ()) + this->state_changed (TAO_LF_Event::LFS_SUCCESS); } void diff --git a/TAO/tao/Asynch_Reply_Dispatcher.cpp b/TAO/tao/Asynch_Reply_Dispatcher.cpp index 5f4c0b7cb6c..774452bb50a 100644 --- a/TAO/tao/Asynch_Reply_Dispatcher.cpp +++ b/TAO/tao/Asynch_Reply_Dispatcher.cpp @@ -55,11 +55,6 @@ TAO_Asynch_Reply_Dispatcher_Base::dispatch_reply ( void -TAO_Asynch_Reply_Dispatcher_Base::dispatcher_bound (TAO_Transport *) -{ -} - -void TAO_Asynch_Reply_Dispatcher_Base::connection_closed (void) { } diff --git a/TAO/tao/Asynch_Reply_Dispatcher.h b/TAO/tao/Asynch_Reply_Dispatcher.h index f81c68a142b..14adf2eb533 100644 --- a/TAO/tao/Asynch_Reply_Dispatcher.h +++ b/TAO/tao/Asynch_Reply_Dispatcher.h @@ -50,8 +50,6 @@ public: // virtual TAO_GIOP_Message_State *message_state (void); - virtual void dispatcher_bound (TAO_Transport *t); - virtual void connection_closed (void); /// Inform that the reply timed out diff --git a/TAO/tao/Exclusive_TMS.cpp b/TAO/tao/Exclusive_TMS.cpp index c0c1e3d8755..95cd9d1bd09 100644 --- a/TAO/tao/Exclusive_TMS.cpp +++ b/TAO/tao/Exclusive_TMS.cpp @@ -43,8 +43,7 @@ TAO_Exclusive_TMS::bind_dispatcher (CORBA::ULong request_id, this->request_id_ = request_id; this->rd_ = rd; - return TAO_Transport_Mux_Strategy::bind_dispatcher (request_id, - rd); + return 0; } void diff --git a/TAO/tao/Follower.cpp b/TAO/tao/Follower.cpp new file mode 100644 index 00000000000..917eabd0dca --- /dev/null +++ b/TAO/tao/Follower.cpp @@ -0,0 +1,36 @@ +// -*- C++ -*- +// $Id$ + +#include "tao/Follower.h" +#include "tao/Leader_Follower.h" + +#if !defined (__ACE_INLINE__) +# include "tao/Follower.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, Follower, "$Id$") + +TAO_Follower::TAO_Follower (TAO_Leader_Follower &leader_follower) + : leader_follower_ (leader_follower) + , condition_ (leader_follower.lock ()) +{ +} + +TAO_Follower::~TAO_Follower (void) +{ +} + +int +TAO_Follower::signal (void) +{ + // We *must* remove ourselves from the list of followers, otherwise + // we could get signalled twice: to wake up as a follower and as the + // next leader. + // The follower may not be there if the reply is received while + // the consumer is not yet waiting for it (i.e. it send the + // request but has not blocked to receive the reply yet). + // Ignore errors. + (void) this->leader_follower_.remove_follower (this); + + return this->condition_.signal (); +} diff --git a/TAO/tao/Follower.h b/TAO/tao/Follower.h new file mode 100644 index 00000000000..c701a8b6ce3 --- /dev/null +++ b/TAO/tao/Follower.h @@ -0,0 +1,69 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Follower.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_FOLLOWER_H +#define TAO_FOLLOWER_H +#include "ace/pre.h" + +#include "tao/orbconf.h" +#include "tao/TAO_Export.h" +#include "ace/Synch.h" +#include "ace/Intrusive_List_Node.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_Leader_Follower; + +/** + * @class TAO_Follower + * + * @brief Represent a thread blocked, as a follower, in the + * Leader/Followers set. + * + * @todo Currently this class offers little abstraction, the follower + * loop should be implemented by this class. + * + */ +class TAO_Export TAO_Follower : public ACE_Intrusive_List_Node<TAO_Follower> +{ +public: + /// Constructor + TAO_Follower (TAO_Leader_Follower &leader_follower); + + /// Destructor + ~TAO_Follower (void); + + /// Access the leader follower that owns this follower + TAO_Leader_Follower &leader_follower (void); + + /// Wait until on the underlying condition variable + int wait (ACE_Time_Value *tv); + + /// Signal the underlying condition variable + int signal (void); + +private: + /// The Leader/Follower set this Follower belongs to + TAO_Leader_Follower &leader_follower_; + + /// Condition variable used to + ACE_SYNCH_CONDITION condition_; +}; + +#if defined (__ACE_INLINE__) +# include "tao/Follower.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_FOLLOWER_H */ diff --git a/TAO/tao/Follower.inl b/TAO/tao/Follower.inl new file mode 100644 index 00000000000..c3073b74efe --- /dev/null +++ b/TAO/tao/Follower.inl @@ -0,0 +1,13 @@ +// $Id$ + +ACE_INLINE TAO_Leader_Follower & +TAO_Follower::leader_follower (void) +{ + return this->leader_follower_; +} + +ACE_INLINE int +TAO_Follower::wait (ACE_Time_Value *tv) +{ + return this->condition_.wait (tv); +} diff --git a/TAO/tao/Follower_Auto_Ptr.cpp b/TAO/tao/Follower_Auto_Ptr.cpp new file mode 100644 index 00000000000..065120de479 --- /dev/null +++ b/TAO/tao/Follower_Auto_Ptr.cpp @@ -0,0 +1,15 @@ +// -*- C++ -*- +// $Id$ + +#include "tao/Follower_Auto_Ptr.h" + +#if !defined (__ACE_INLINE__) +# include "tao/Follower_Auto_Ptr.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, Follower_Auto_Ptr, "$Id$") + +TAO_LF_Follower_Auto_Ptr::~TAO_LF_Follower_Auto_Ptr (void) +{ + this->leader_follower_.release_follower (this->follower_); +} diff --git a/TAO/tao/Follower_Auto_Ptr.h b/TAO/tao/Follower_Auto_Ptr.h new file mode 100644 index 00000000000..6ed17e823d8 --- /dev/null +++ b/TAO/tao/Follower_Auto_Ptr.h @@ -0,0 +1,59 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Follower_Auto_Ptr.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_FOLLOWER_AUTO_PTR_H +#define TAO_FOLLOWER_AUTO_PTR_H +#include "ace/pre.h" + +#include "tao/orbconf.h" +#include "tao/Leader_Follower.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_Follower; + +/** + * @brief Implement an auto_ptr-like class for the TAO_Followers + * allocated via a TAO_Leader_Follower set. + * + * The Leader/Follower set is a factory for TAO_Follower objects + */ +class TAO_Export TAO_LF_Follower_Auto_Ptr +{ +public: + /// Constructor + TAO_LF_Follower_Auto_Ptr (TAO_Leader_Follower &); + + /// Destructor + ~TAO_LF_Follower_Auto_Ptr (void); + + /// Implement the smart pointer methods + TAO_Follower *get (void); + TAO_Follower *operator->(void); + operator TAO_Follower *(void); + +private: + /// Keep a reference to the leader follower + TAO_Leader_Follower &leader_follower_; + + /// The follower + TAO_Follower *follower_; +}; + +#if defined (__ACE_INLINE__) +# include "tao/Follower_Auto_Ptr.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_FOLLOWER_AUTO_PTR_H */ diff --git a/TAO/tao/Follower_Auto_Ptr.inl b/TAO/tao/Follower_Auto_Ptr.inl new file mode 100644 index 00000000000..d468cd9dea5 --- /dev/null +++ b/TAO/tao/Follower_Auto_Ptr.inl @@ -0,0 +1,26 @@ +// $Id$ + +ACE_INLINE +TAO_LF_Follower_Auto_Ptr::TAO_LF_Follower_Auto_Ptr (TAO_Leader_Follower &lf) + : leader_follower_ (lf) + , follower_ (leader_follower_.allocate_follower ()) +{ +} + +ACE_INLINE TAO_Follower * +TAO_LF_Follower_Auto_Ptr::get (void) +{ + return this->follower_; +} + +ACE_INLINE TAO_Follower * +TAO_LF_Follower_Auto_Ptr::operator-> (void) +{ + return this->follower_; +} + +ACE_INLINE +TAO_LF_Follower_Auto_Ptr::operator TAO_Follower * (void) +{ + return this->follower_; +} diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 3df80faebbc..ab1336b0795 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -11,6 +11,7 @@ #include "TAO_Server_Request.h" #include "GIOP_Message_Locate_Header.h" #include "Transport.h" +#include "tao/LF_Strategy.h" #if !defined (__ACE_INLINE__) # include "GIOP_Message_Base.i" diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index 640098ad689..a44b57f15d9 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -10,7 +10,8 @@ #include "tao/GIOP_Message_Locate_Header.h" #include "tao/target_specification.h" #include "tao/Leader_Follower.h" -#include "Transport.h" +#include "tao/LF_Strategy.h" +#include "tao/Transport.h" #if !defined (__ACE_INLINE__) # include "tao/GIOP_Message_Lite.i" diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index 2b63e7da11a..8b9fd565d49 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -632,7 +632,7 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request, int reply_error = this->transport_->wait_strategy ()->wait (this->max_wait_time_, - this->rd_.reply_received ()); + this->rd_); if (TAO_debug_level > 0 && this->max_wait_time_ != 0) diff --git a/TAO/tao/LF_Event.cpp b/TAO/tao/LF_Event.cpp new file mode 100644 index 00000000000..0ac59b95ca9 --- /dev/null +++ b/TAO/tao/LF_Event.cpp @@ -0,0 +1,77 @@ +// -*- C++ -*- +// $Id$ + +#include "tao/LF_Event.h" +#include "tao/Follower.h" +#include "tao/Leader_Follower.h" + +#if !defined (__ACE_INLINE__) +# include "tao/LF_Event.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, LF_Event, "$Id$") + +TAO_LF_Event::TAO_LF_Event (void) + : state_ (TAO_LF_Event::LFS_IDLE) + , follower_ (0) +{ +} + +TAO_LF_Event::~TAO_LF_Event (void) +{ +} + +void +TAO_LF_Event::state_changed (int new_state) +{ + if (this->follower_ == 0) + { + this->state_changed_i (new_state); + } + else + { + TAO_Leader_Follower &leader_follower = + this->follower_->leader_follower (); + + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ()); + + this->state_changed_i (new_state); + + this->follower_->signal (); + } +} + +void +TAO_LF_Event::state_changed_i (int new_state) +{ + if (this->state_ == new_state) + return; + + // Validate the state change + if (this->state_ == TAO_LF_Event::LFS_IDLE) + { + // From the LFS_IDLE state we can only become active. + if (new_state == TAO_LF_Event::LFS_ACTIVE) + this->state_ = new_state; + return; + } + // States other than LFS_ACTIVE are final + if (this->state_ != TAO_LF_Event::LFS_ACTIVE) + return; + + this->state_ = new_state; +} + +int +TAO_LF_Event::successful (void) const +{ + return this->state_ == TAO_LF_Event::LFS_SUCCESS; +} + +int +TAO_LF_Event::error_detected (void) const +{ + return (this->state_ == TAO_LF_Event::LFS_FAILURE + && this->state_ == TAO_LF_Event::LFS_TIMEOUT + && this->state_ == TAO_LF_Event::LFS_CONNECTION_CLOSED); +} diff --git a/TAO/tao/LF_Event.h b/TAO/tao/LF_Event.h new file mode 100644 index 00000000000..888e1240a16 --- /dev/null +++ b/TAO/tao/LF_Event.h @@ -0,0 +1,122 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file LF_Event.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_LF_EVENT_H +#define TAO_LF_EVENT_H +#include "ace/pre.h" + +#include "corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_Transport; +// @todo +class TAO_Follower; + +/** + * @class TAO_LF_Event + * + * @brief Use the Leader/Follower loop to wait for one specific event. + * + * The Leader/Follower event loop is used to wait for incoming + * responses, as well as to wait for all the data to be flushed. + * This class encapsulates this event loop. It uses Template Method to + * parametrize the 'waited for' predicate (i.e. reply received or + * message sent.) + * + * @todo Implementing the Leader/Followers loop in this class, as + * well as the callbacks to communicate that an event has completed + * leads to excessive coupling. A better design would use a separate + * class to signal the events, that would allow us to remove the + * Leader/Followers logic from the ORB. However, that requires other + * major changes and it somewhat complicates the design. + * + */ +class TAO_Export TAO_LF_Event +{ +public: + /// Constructor + TAO_LF_Event (void); + + /// Destructor + virtual ~TAO_LF_Event (void); + + /// Bind a follower + /** + * An event can be waited on by at most one follower thread, this + * method is used to bind the waiting thread to the event, in order + * to let the event signal any important state changes. + * + * @return -1 if the LF_Event is already bound, 0 otherwise + */ + int bind (TAO_Follower *follower); + + //@{ + /** @name State management + * + * A Leader/Followers event goes through several states during its + * lifetime. We use an enum to represent those states and state + * changes are validated according to the rules below. + * + */ + enum { + /// The event is created, initial state can only move to + /// LFS_ACTIVE + LFS_IDLE, + /// The event is active, can change to any of the following + /// states, each of them is a final state + LFS_ACTIVE, + /// The event has completed successfully + LFS_SUCCESS, + /// A failure has been detected while the event was active + LFS_FAILURE, + /// The event has timed out + LFS_TIMEOUT, + /// The connection was closed while the state was active + LFS_CONNECTION_CLOSED + }; + + /// Change the state + void state_changed (int new_state); + + /// Return 1 if the condition was satisfied successfully, 0 if it + /// has not + int successful (void) const; + + /// Return 1 if an error was detected while waiting for the + /// event + int error_detected (void) const; + //@} + + /// Check if we should keep waiting. + int keep_waiting (void); + +protected: + /// Validate the state change + void state_changed_i (int new_state); + +private: + /// The current state + int state_; + + /// The bound follower thread + TAO_Follower *follower_; +}; + +#if defined (__ACE_INLINE__) +# include "LF_Event.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_LF_EVENT_H */ diff --git a/TAO/tao/LF_Event.inl b/TAO/tao/LF_Event.inl new file mode 100644 index 00000000000..200eff65967 --- /dev/null +++ b/TAO/tao/LF_Event.inl @@ -0,0 +1,16 @@ +// $Id$ + +ACE_INLINE int +TAO_LF_Event::bind (TAO_Follower *follower) +{ + if (this->follower_ != 0) + return -1; + this->follower_ = follower; + return 0; +} + +ACE_INLINE int +TAO_LF_Event::keep_waiting (void) +{ + return (this->successful () == 0) && (this->error_detected () == 0); +} diff --git a/TAO/tao/LF_Event_Loop_Thread_Helper.cpp b/TAO/tao/LF_Event_Loop_Thread_Helper.cpp new file mode 100644 index 00000000000..e2569345d13 --- /dev/null +++ b/TAO/tao/LF_Event_Loop_Thread_Helper.cpp @@ -0,0 +1,10 @@ +// -*- C++ -*- +// $Id$ + +#include "tao/LF_Event_Loop_Thread_Helper.h" + +#if !defined (__ACE_INLINE__) +# include "tao/LF_Event_Loop_Thread_Helper.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, LF_Event_Loop_Thread_Helper, "$Id$") diff --git a/TAO/tao/LF_Event_Loop_Thread_Helper.h b/TAO/tao/LF_Event_Loop_Thread_Helper.h new file mode 100644 index 00000000000..5edde3028b1 --- /dev/null +++ b/TAO/tao/LF_Event_Loop_Thread_Helper.h @@ -0,0 +1,62 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file LF_Event_Loop_Thread_Helper.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_LF_EVENT_LOOP_THREAD_HELPER_H +#define TAO_LF_EVENT_LOOP_THREAD_HELPER_H +#include "ace/pre.h" + +#include "tao/orbconf.h" +#include "tao/LF_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @brief Helper class to enter and exit the Leader/Followers event + * loop. + * + * Uses the Guard idiom to enter and exit the Leader/Followers event + * loop. + */ +class TAO_Export TAO_LF_Event_Loop_Thread_Helper +{ +public: + /// Constructor + TAO_LF_Event_Loop_Thread_Helper (TAO_Leader_Follower &leader_follower, + TAO_LF_Strategy &lf_strategy, + ACE_Time_Value *max_wait_time); + + /// Destructor + ~TAO_LF_Event_Loop_Thread_Helper (void); + + /// Calls <set_event_loop_thread> on the leader/followers object. + int event_loop_return (void) const; + +private: + /// Reference to leader/followers object. + TAO_Leader_Follower &leader_follower_; + + /// The Leader/Follower Strategy used by this ORB. + TAO_LF_Strategy &lf_strategy_; + + /// Remembers the status returned while trying to enter the event + /// loop. + int event_loop_return_; +}; + +#if defined (__ACE_INLINE__) +# include "tao/LF_Event_Loop_Thread_Helper.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_LF_EVENT_LOOP_THREAD_HELPER_H */ diff --git a/TAO/tao/LF_Event_Loop_Thread_Helper.inl b/TAO/tao/LF_Event_Loop_Thread_Helper.inl new file mode 100644 index 00000000000..d3db7e44eea --- /dev/null +++ b/TAO/tao/LF_Event_Loop_Thread_Helper.inl @@ -0,0 +1,28 @@ +// $Id$ + +ACE_INLINE +TAO_LF_Event_Loop_Thread_Helper:: + TAO_LF_Event_Loop_Thread_Helper (TAO_Leader_Follower &leader_follower, + TAO_LF_Strategy &lf_strategy, + ACE_Time_Value *max_wait_time) + : leader_follower_ (leader_follower) + , lf_strategy_ (lf_strategy) +{ + this->event_loop_return_ = + this->lf_strategy_.set_event_loop_thread (max_wait_time, leader_follower_); +} + +ACE_INLINE +TAO_LF_Event_Loop_Thread_Helper::~TAO_LF_Event_Loop_Thread_Helper (void) +{ + int call_reset = (this->event_loop_return_ == 0); + this->lf_strategy_.reset_event_loop_thread (call_reset, + this->leader_follower_); +} + +ACE_INLINE int +TAO_LF_Event_Loop_Thread_Helper::event_loop_return (void) const +{ + return this->event_loop_return_; +} + diff --git a/TAO/tao/LF_Strategy.cpp b/TAO/tao/LF_Strategy.cpp new file mode 100644 index 00000000000..e1717c7baf0 --- /dev/null +++ b/TAO/tao/LF_Strategy.cpp @@ -0,0 +1,14 @@ +// -*- C++ -*- +// $Id$ + +#include "tao/LF_Strategy.h" + +#if !defined (__ACE_INLINE__) +# include "tao/LF_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, LF_Strategy, "$Id$") + +TAO_LF_Strategy::~TAO_LF_Strategy (void) +{ +} diff --git a/TAO/tao/LF_Strategy.h b/TAO/tao/LF_Strategy.h new file mode 100644 index 00000000000..248f52863e2 --- /dev/null +++ b/TAO/tao/LF_Strategy.h @@ -0,0 +1,76 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file LF_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_LF_STRATEGY_H +#define TAO_LF_STRATEGY_H +#include "ace/pre.h" + +#include "tao/orbconf.h" +#include "tao/TAO_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_Leader_Follower; + +/** + * @brief Strategize Leader/Follower manipulations in the ORB event + * loop. + * + * The ORB event loop must participate in the Leader/Followers + * protocol, but only if that concurrency model is configured, + * otherwise performance suffers. + * + * This class strategizes the ORB behavior in this respect. + * + */ +class TAO_Export TAO_LF_Strategy +{ +public: + /// Destructor + virtual ~TAO_LF_Strategy (void); + + /// The current thread will handle an upcall + /** + * Threads that handle requests can block for long periods of time, + * causing deadlocks if they don't elect a new leader before + * starting the upcall the system can become non-responsive or + * dead-lock. + */ + virtual void set_upcall_thread (TAO_Leader_Follower &) = 0; + + /// The current thread is entering the reactor event loop + /** + * Threads that block in the reactor event loop become "server" + * threads for the Leader/Follower set. They must be flagged + * specially because they do not wait for one specific event, but + * for any event whatsoever. + */ + virtual int set_event_loop_thread (ACE_Time_Value *max_wait_time, + TAO_Leader_Follower &) = 0; + + /// The current thread is leaving the event loop + /** + * When the thread leaves the event loop a new leader must be + * elected. + */ + virtual void reset_event_loop_thread (int call_reset, + TAO_Leader_Follower &) = 0; +}; + +#if defined (__ACE_INLINE__) +# include "tao/LF_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_LF_STRATEGY_H */ diff --git a/TAO/tao/LF_Strategy.inl b/TAO/tao/LF_Strategy.inl new file mode 100644 index 00000000000..74e88caa0c5 --- /dev/null +++ b/TAO/tao/LF_Strategy.inl @@ -0,0 +1,2 @@ +// $Id$ + diff --git a/TAO/tao/LF_Strategy_Complete.cpp b/TAO/tao/LF_Strategy_Complete.cpp new file mode 100644 index 00000000000..7bda47e9b16 --- /dev/null +++ b/TAO/tao/LF_Strategy_Complete.cpp @@ -0,0 +1,47 @@ +// -*- C++ -*- +// $Id$ + +#include "tao/LF_Strategy_Complete.h" +#include "tao/Leader_Follower.h" + +#if !defined (__ACE_INLINE__) +# include "tao/LF_Strategy_Complete.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, LF_Strategy_Complete, "$Id$") + +TAO_LF_Strategy_Complete::~TAO_LF_Strategy_Complete (void) +{ +} + +void +TAO_LF_Strategy_Complete::set_upcall_thread (TAO_Leader_Follower &lf) +{ + lf.set_upcall_thread (); +} + +int +TAO_LF_Strategy_Complete::set_event_loop_thread (ACE_Time_Value *tv, + TAO_Leader_Follower &lf) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, lf.lock (), -1); + + return lf.set_event_loop_thread (tv); +} + +void +TAO_LF_Strategy_Complete::reset_event_loop_thread (int call_reset, + TAO_Leader_Follower &lf) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lf.lock ()); + + if (call_reset) + lf.reset_event_loop_thread (); + + int result = lf.elect_new_leader (); + + if (result == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) Failed to wake up ") + ACE_TEXT ("a follower thread\n"))); +} diff --git a/TAO/tao/LF_Strategy_Complete.h b/TAO/tao/LF_Strategy_Complete.h new file mode 100644 index 00000000000..13084461979 --- /dev/null +++ b/TAO/tao/LF_Strategy_Complete.h @@ -0,0 +1,52 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file LF_Strategy_Complete.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_LF_STRATEGY_COMPLETE_H +#define TAO_LF_STRATEGY_COMPLETE_H +#include "ace/pre.h" + +#include "tao/LF_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @brief A concrete TAO_LF_Strategy for ORB configurations that use + * the Leader/Followers event loop. + */ +class TAO_Export TAO_LF_Strategy_Complete : public TAO_LF_Strategy +{ +public: + /// Constructor + TAO_LF_Strategy_Complete (void); + + //@{ + /** @name Virtual Methods + * + * Please check the documentation in TAO_LF_Strategy + */ + virtual ~TAO_LF_Strategy_Complete (void); + + virtual void set_upcall_thread (TAO_Leader_Follower &); + virtual int set_event_loop_thread (ACE_Time_Value *max_wait_time, + TAO_Leader_Follower &); + virtual void reset_event_loop_thread (int call_reset, + TAO_Leader_Follower &); +}; + +#if defined (__ACE_INLINE__) +# include "tao/LF_Strategy_Complete.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_LF_STRATEGY_COMPLETE_H */ diff --git a/TAO/tao/LF_Strategy_Complete.inl b/TAO/tao/LF_Strategy_Complete.inl new file mode 100644 index 00000000000..240b3ba5dd0 --- /dev/null +++ b/TAO/tao/LF_Strategy_Complete.inl @@ -0,0 +1,6 @@ +// $Id$ + +ACE_INLINE +TAO_LF_Strategy_Complete::TAO_LF_Strategy_Complete (void) +{ +} diff --git a/TAO/tao/Leader_Follower.cpp b/TAO/tao/Leader_Follower.cpp index 342271f87a5..3ae55128b48 100644 --- a/TAO/tao/Leader_Follower.cpp +++ b/TAO/tao/Leader_Follower.cpp @@ -1,122 +1,62 @@ // $Id$ -#include "Leader_Follower.h" -#include "Resource_Factory.h" +#include "tao/Leader_Follower.h" +#include "tao/Resource_Factory.h" +#include "tao/Follower.h" +#include "tao/Follower_Auto_Ptr.h" +#include "tao/LF_Event.h" + +#include "tao/Transport.h" #include "ace/Reactor.h" #if !defined (__ACE_INLINE__) -# include "Leader_Follower.i" +# include "tao/Leader_Follower.i" #endif /* ! __ACE_INLINE__ */ ACE_RCSID(tao, Leader_Follower, "$Id$") TAO_Leader_Follower::~TAO_Leader_Follower (void) { + while (!this->follower_free_list_.empty ()) + { + TAO_Follower *follower = this->follower_free_list_.pop_front (); + delete follower; + } // Hand the reactor back to the resource factory. this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_); this->reactor_ = 0; } -TAO_Leader_Follower::TAO_Follower_Node::TAO_Follower_Node (TAO_SYNCH_CONDITION* follower_ptr) - : follower_ (follower_ptr), - next_ (0) +TAO_Follower * +TAO_Leader_Follower::allocate_follower (void) { + if (!this->follower_free_list_.empty ()) + return this->follower_free_list_.pop_front (); + return new TAO_Follower (*this); } - -TAO_Leader_Follower::TAO_Follower_Queue::TAO_Follower_Queue (void) - : head_ (0), - tail_ (0) -{ - -} - -int -TAO_Leader_Follower::TAO_Follower_Queue::insert (TAO_Follower_Node* node) +void +TAO_Leader_Follower::release_follower (TAO_Follower *follower) { - if (this->head_ == 0) { - this->head_ = node; - this->tail_ = node; - // Make sure that we don't have garbage in the case when the same node - // is added a second time. This is necessary as the nodes are on the - // stack. - node->next_ = 0; - } - else - { - // Add the node to the tail and modify the pointers - TAO_Follower_Node* temp = this->tail_; - temp->next_ = node; - this->tail_ = node; - node->next_ = 0; - } - return 0; + this->follower_free_list_.push_front (follower); } int -TAO_Leader_Follower::TAO_Follower_Queue::remove (TAO_Follower_Node* node) -{ - TAO_Follower_Node* prev = 0; - TAO_Follower_Node* curr = 0; - - // No followers in queue, return - if (this->head_ == 0) - return -1; - - // Check is for whether we have the same condition variable on the - // queue rather than the same node structure which wraps it. - for (curr = this->head_; - curr != 0 && curr->follower_ != node->follower_; - curr = curr->next_) - { - prev = curr; - } - - // Entry not found in the queue - if (curr == 0) - return -1; - // Entry found at the head of the queue - else if (prev == 0) - this->head_ = this->head_->next_; - else - prev->next_ = curr->next_; - // Entry at the tail - if (curr->next_ == 0) - this->tail_ = prev; - - return 0; -} - - -TAO_SYNCH_CONDITION* -TAO_Leader_Follower::get_next_follower (void) +TAO_Leader_Follower::elect_new_leader_i (void) { - // If the queue is empty return - if (this->follower_set_.is_empty()) - return 0; - - TAO_Follower_Node* next_follower = this->follower_set_.head_; - - TAO_SYNCH_CONDITION *cond = next_follower->follower_; + TAO_Follower* follower = + this->follower_set_.pop_front (); #if defined (TAO_DEBUG_LEADER_FOLLOWER) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) LF::get_next_follower - " + "TAO (%P|%t) LF::elect_new_leader_i - " "follower is %x\n", - cond)); + follower)); #endif /* TAO_DEBUG_LEADER_FOLLOWER */ - // We *must* remove it when we signal it so the same condition is - // not signalled for both wake up as a follower and as the next - // leader. - // The follower may not be there if the reply is received while the - // consumer is not yet waiting for it (i.e. it send the request but - // has not blocked to receive the reply yet) - (void) this->remove_follower (next_follower); // Ignore errors.. - - return cond; + return follower->signal (); } int @@ -225,77 +165,284 @@ TAO_Leader_Follower::reset_client_thread (void) } } -TAO_LF_Strategy::TAO_LF_Strategy () +int +TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event, + TAO_Transport *transport, + ACE_Time_Value *max_wait_time) { -} + // Obtain the lock. + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1); + + // Optmize the first iteration [no access to errno] + int result = 1; + + { + // Calls this->set_client_thread () on construction and + // this->reset_client_thread () on destruction. + TAO_LF_Client_Thread_Helper client_thread_helper (*this); + ACE_UNUSED_ARG (client_thread_helper); + + ACE_Countdown_Time countdown (max_wait_time); + + // Check if there is a leader. Note that it cannot be us since we + // gave up our leadership when we became a client. + if (this->leader_available ()) + { + // = Wait as a follower. + + // Grab a follower: + TAO_LF_Follower_Auto_Ptr follower (*this); + if (follower.get () == 0) + return -1; + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " (follower) on Transport <%d>, cond <%x>\n", + transport->id (), + follower.get ())); + + // Bound the follower and the LF_Event, this is important to + // get a signal when the event terminates + event->bind (follower.get ()); + + while (event->keep_waiting () && + this->leader_available ()) + { + // Add ourselves to the list, do it everytime we wake up + // from the CV loop. Because: + // + // - The leader thread could have elected us as the new + // leader. + // - Before we can assume the role another thread becomes + // the leader + // - But our condition variable could have been removed + // already, if we don't add it again we will never wake + // up. + // + // Notice that we can have spurious wake ups, in that case + // adding the leader results in an error, that must be + // ignored. + // You may be thinking of not removing the condition + // variable in the code that sends the signal, but + // removing it here, that does not work either, in that + // case the condition variable may be used twice: + // + // - Wake up because its reply arrived + // - Wake up because it must become the leader + // + // but only the first one has any effect, so the leader is + // lost. + // + + (void) this->add_follower (follower); + + if (max_wait_time == 0) + { + if (follower->wait (max_wait_time) == -1) + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event, " + " (follower) on <%d>" + " [no timer, cond failed]\n", + transport->id ())); + + // @@ Michael: What is our error handling in this case? + // We could be elected as leader and + // no leader would come in? + return -1; + } + } + else + { + countdown.update (); + ACE_Time_Value tv = ACE_OS::gettimeofday (); + tv += *max_wait_time; + if (follower->wait (&tv) == -1) + { + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait," + " (follower) on <%x> " + " [has timer, follower failed]\n", + transport->id ())); + + this->remove_follower (follower); + if (!event->successful ()) + { + // Remove follower can fail because either + // 1) the condition was satisfied (i.e. reply + // received or queue drained), or + // 2) somebody elected us as leader, or + // 3) the connection got closed. + // + // Therefore: + // If remove_follower fails and the condition + // was not satisfied, we know that we got + // elected as a leader. + // But we got a timeout, so we cannot become + // the leader, therefore, we have to select a + // new leader. + // + + if (this->elect_new_leader () == -1 + && TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " elect_new_leader failed\n")); + } + } + return -1; + } + } + } + + countdown.update (); + + // @@ Michael: This is an old comment why we do not want to + // remove the follower here. + // We should not remove the follower here, we *must* remove it when + // we signal it so the same condition is not signalled for + // both wake up as a follower and as the next leader. + + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " done (follower) on <%d>, successful %d\n", + transport->id (), + event->successful ())); + + // Now somebody woke us up to become a leader or to handle our + // input. We are already removed from the follower queue. + + if (event->successful ()) + return 0; + + // FALLTHROUGH + // We only get here if we woke up but the reply is not + // complete yet, time to assume the leader role.... + // i.e. ACE_ASSERT (reply_received == 0); + } + + // = Leader Code. + + // The only way to reach this point is if we must become the + // leader, because there is no leader or we have to update to a + // leader or we are doing nested upcalls in this case we do + // increase the refcount on the leader in TAO_ORB_Core. + + // Calls this->set_client_leader_thread () on + // construction and this->reset_client_leader_thread () + // on destruction. Note that this may increase the refcount of + // the leader. + TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this); + ACE_UNUSED_ARG (client_leader_thread_helper); -TAO_LF_Strategy::~TAO_LF_Strategy () -{ -} + { + ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, + this->reverse_lock (), -1); -TAO_Complete_LF_Strategy::TAO_Complete_LF_Strategy () -{ -} + // Become owner of the reactor. + ACE_Reactor *reactor = this->reactor_; + reactor->owner (ACE_Thread::self ()); -TAO_Complete_LF_Strategy::~TAO_Complete_LF_Strategy () -{ -} + // Run the reactor event loop. -void -TAO_Complete_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &leader_follower) -{ - leader_follower.set_upcall_thread (); -} + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " (leader) enter reactor event loop on <%d>\n", + transport->id ())); -int -TAO_Complete_LF_Strategy::set_event_loop_thread (ACE_Time_Value *max_wait_time, - TAO_Leader_Follower &leader_follower) -{ - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock (), -1); + // If we got our event, no need to run the event loop any + // further. + while (event->keep_waiting ()) + { + // Run the event loop. + result = reactor->handle_events (max_wait_time); - return leader_follower.set_event_loop_thread (max_wait_time); -} + // Did we timeout? If so, stop running the loop. + if (result == 0 && + max_wait_time != 0 && + *max_wait_time == ACE_Time_Value::zero) + break; -void -TAO_Complete_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int call_reset, - TAO_Leader_Follower &leader_follower) -{ - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ()); + // Other errors? If so, stop running the loop. + if (result == -1) + break; - if (call_reset) - leader_follower.reset_event_loop_thread (); + // Otherwise, keep going... + } - int result = leader_follower.elect_new_leader (); + if (TAO_debug_level >= 5) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " (leader) exit reactor event loop on <%d>\n", + transport->id ())); + } + } + // + // End artificial scope for auto_ptr like helpers calling: + // this->reset_client_thread () and (maybe) + // this->reset_client_leader_thread (). + // + + // Wake up the next leader, we cannot do that in handle_input, + // because the woken up thread would try to get into handle_events, + // which is at the time in handle_input still occupied. But do it + // before checking the error in <result>, even if there is an error + // in our input we should continue running the loop in another + // thread. + + if (this->elect_new_leader () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " failed to elect new leader\n"), + -1); if (result == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) Failed to wake up ") - ACE_TEXT ("a follower thread\n"))); -} + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - Leader_Follower::wait_for_event," + " handle_events failed\n"), + -1); -TAO_Null_LF_Strategy::TAO_Null_LF_Strategy () -{ + // Return an error if there was a problem receiving the reply... + if (max_wait_time != 0) + { + if (!event->successful () + && *max_wait_time == ACE_Time_Value::zero) + { + result = -1; + errno = ETIME; + } + else if (event->error_detected ()) + { + // If the time did not expire yet, but we get a failure, + // e.g. the connections closed, we should still return an error. + result = -1; + } + } + else + { + result = 0; + if (event->error_detected ()) + { + result = -1; + } + } + return result; } -TAO_Null_LF_Strategy::~TAO_Null_LF_Strategy () -{ -} +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -void -TAO_Null_LF_Strategy::set_upcall_thread (TAO_Leader_Follower &) -{ -} +template class ACE_Intrusive_List<TAO_Follower>; +template class ACE_Intrusive_List_Node<TAO_Follower>; -int -TAO_Null_LF_Strategy::set_event_loop_thread (ACE_Time_Value *, - TAO_Leader_Follower &) -{ - return 0; -} +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -void -TAO_Null_LF_Strategy::reset_event_loop_thread_and_elect_new_leader (int, - TAO_Leader_Follower &) -{ -} +#pragma instantiate ACE_Intrusive_List<TAO_Follower> +#pragma instantiate ACE_Intrusive_List_Node<TAO_Follower> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/tao/Leader_Follower.h b/TAO/tao/Leader_Follower.h index 2d3b6ae5360..513566bb8fe 100644 --- a/TAO/tao/Leader_Follower.h +++ b/TAO/tao/Leader_Follower.h @@ -14,11 +14,15 @@ #include "ace/pre.h" #include "tao/ORB_Core.h" +#include "ace/Intrusive_List.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +class TAO_Follower; +class TAO_LF_Event; + class TAO_Export TAO_Leader_Follower { public: @@ -49,6 +53,21 @@ public: /// A server thread has finished is making a request. void reset_client_thread (void); + /// Wait on the Leader/Followers loop until one event happens. + /** + * @param event The event we wait for, the loop iterates until the + * event is sucessful, or it fails due to timeout, and error or a + * connection closed. + * @param transport The transport attached to the event + * @param max_wait_time Limit the time spent on the loop + * + * @todo Document this better, split the Follower code to the + * TAO_Follower class, we probably don't need the transport object. + */ + int wait_for_event (TAO_LF_Event *event, + TAO_Transport *transport, + ACE_Time_Value *max_wait_time); + /// The current thread has become the leader thread in the /// client side leader-follower set. void set_client_leader_thread (void) ; @@ -71,39 +90,62 @@ public: */ int elect_new_leader (void); - /// Node structure for the queue of followers - struct TAO_Export TAO_Follower_Node - { - /// Constructor - TAO_Follower_Node (TAO_SYNCH_CONDITION* follower_ptr); + /** @name Follower creation/destructions + * + * The Leader/Followers set acts as a factory for the Follower + * objects. Followers are used to represent a thread blocked + * waiting in the Follower set. + * + * The Leader/Followers abstraction keeps a list of the waiting + * followers, so it can wake up one when the leader thread stops + * handling events. + * + * For performance reasons the Leader/Followers set uses a pool (or + * free-list) to keep Follower objects unattached to any thread. It + * could be tempting to use TSS to keep such followers, after all a + * thread can only need one such Follower object, however, that does + * not work with multiple Leader/Followers sets, consult this bug + * report for more details: + * + * http://ace.cs.wustl.edu/bugzilla/show_bug.cgi?id=296 + * + */ + //@{ + /// Allocate a new follower to the caller. + TAO_Follower *allocate_follower (void); - /// Follower - TAO_SYNCH_CONDITION *follower_; + /// The caller has finished using a follower. + void release_follower (TAO_Follower *); + //@} - /// Pointer to the next follower - TAO_Follower_Node *next_; - }; - /** - * adds the a follower to the set of followers in the leader- - * follower model - * returns 0 on success, -1 on failure. + /** @name Follower Set Operations + * */ - int add_follower (TAO_Follower_Node *follower_ptr); + //@{ + /// Add a new follower to the set + void add_follower (TAO_Follower *follower); - /// checks for the availablity of a follower - /// returns 1 on available, 0 else - int follower_available (void) const; + /// Removes a follower from the leader-follower set + void remove_follower (TAO_Follower *follower); - /// removes a follower from the leader-follower set - /// returns 0 on success, -1 on failure - int remove_follower (TAO_Follower_Node *follower_ptr); + /// Checks if there are any followers available + /** + * @return 1 if there follower set is not empty + */ + int follower_available (void) const; - /// returns randomly a follower from the leader-follower set - /// returns follower on success, else 0 - TAO_SYNCH_CONDITION *get_next_follower (void); + //@} - /// Accessors + /// Get a reference to the underlying mutex TAO_SYNCH_MUTEX &lock (void); + + /// Provide a pre-initialized reverse lock for the Leader/Followers + /// set. + /** + * The Leader/Followers set mutex must be release during some long + * running operations. This helper class simplifies the process of + * releasing and reacquiring said mutex. + */ ACE_Reverse_Lock<TAO_SYNCH_MUTEX> &reverse_lock (void); /// Check if there are any client threads running @@ -126,6 +168,21 @@ private: */ void reset_event_loop_thread_i (TAO_ORB_Core_TSS_Resources *tss); + /** @name Follower Set Operations + * + */ + //@{ + /// Remote a follower from the Followers set and promote it to the + /// leader role. + /** + * This is a helper routine for elect_new_leader(), after verifying + * that all the pre-conditions are satisfied the Follower set is + * changed and the promoted Follower is signaled. + */ + int elect_new_leader_i (void); + + //@} + private: /// The orb core TAO_ORB_Core *orb_core_; @@ -136,32 +193,12 @@ private: /// do protect the access to the following three members ACE_Reverse_Lock<TAO_SYNCH_MUTEX> reverse_lock_; - /// Queue to store the followers. - struct TAO_Export TAO_Follower_Queue - { - /// Constructor - TAO_Follower_Queue (void); - - /// Checks if the queue is empty. - int is_empty (void) const; - - /// Removes a follower from the queue. - int remove (TAO_Follower_Node *); + /// Implement the Leader/Followers set using an intrusive list + typedef ACE_Intrusive_List<TAO_Follower> Follower_Set; + Follower_Set follower_set_; - /// Inserts a follower into the queue. - /// Returns 0 on success, -1 for failure, 1 if the element is already - /// present. - int insert (TAO_Follower_Node *); - - /// Pointer to the head of the queue. - TAO_Follower_Node *head_; - - /// Pointer to the tail of the queue. - TAO_Follower_Node *tail_; - }; - - /// Queue to keep the followers on the stack. - TAO_Follower_Queue follower_set_; + /// Use a free list to allocate and release Follower objects + Follower_Set follower_free_list_; /** * Count the number of active leaders. @@ -217,80 +254,6 @@ private: TAO_Leader_Follower &leader_follower_; }; -class TAO_LF_Strategy; - -class TAO_Export TAO_LF_Event_Loop_Thread_Helper -{ -public: - /// Constructor - TAO_LF_Event_Loop_Thread_Helper (TAO_Leader_Follower &leader_follower, - TAO_LF_Strategy &lf_strategy); - - /// Destructor - ~TAO_LF_Event_Loop_Thread_Helper (void); - - /// Calls <set_event_loop_thread> on the leader/followers object. - int set_event_loop_thread (ACE_Time_Value *max_wait_time); - -private: - /// Reference to leader/followers object. - TAO_Leader_Follower &leader_follower_; - - TAO_LF_Strategy &lf_strategy_; - - /// Remembers whether we have to call the reset method in the - /// destructor. - int call_reset_; -}; - -class TAO_Export TAO_LF_Strategy -{ -public: - TAO_LF_Strategy (); - - virtual ~TAO_LF_Strategy (); - - virtual void set_upcall_thread (TAO_Leader_Follower &leader_follower) = 0; - - virtual int set_event_loop_thread (ACE_Time_Value *max_wait_time, - TAO_Leader_Follower &leader_follower) = 0; - - virtual void reset_event_loop_thread_and_elect_new_leader (int call_reset, - TAO_Leader_Follower &leader_follower) = 0; -}; - -class TAO_Export TAO_Complete_LF_Strategy : public TAO_LF_Strategy -{ -public: - TAO_Complete_LF_Strategy (); - - virtual ~TAO_Complete_LF_Strategy (); - - virtual void set_upcall_thread (TAO_Leader_Follower &leader_follower); - - virtual int set_event_loop_thread (ACE_Time_Value *max_wait_time, - TAO_Leader_Follower &leader_follower); - - virtual void reset_event_loop_thread_and_elect_new_leader (int call_reset, - TAO_Leader_Follower &leader_follower); -}; - -class TAO_Export TAO_Null_LF_Strategy : public TAO_LF_Strategy -{ -public: - TAO_Null_LF_Strategy (); - - virtual ~TAO_Null_LF_Strategy (); - - virtual void set_upcall_thread (TAO_Leader_Follower &leader_follower); - - virtual int set_event_loop_thread (ACE_Time_Value *max_wait_time, - TAO_Leader_Follower &leader_follower); - - virtual void reset_event_loop_thread_and_elect_new_leader (int call_reset, - TAO_Leader_Follower &leader_follower); -}; - #if defined (__ACE_INLINE__) # include "tao/Leader_Follower.i" #endif /* __ACE_INLINE__ */ diff --git a/TAO/tao/Leader_Follower.i b/TAO/tao/Leader_Follower.i index 9e0f1eeff99..3b05ae97cd5 100644 --- a/TAO/tao/Leader_Follower.i +++ b/TAO/tao/Leader_Follower.i @@ -23,15 +23,9 @@ TAO_Leader_Follower::get_tss_resources (void) const } ACE_INLINE int -TAO_Leader_Follower::TAO_Follower_Queue::is_empty (void) const -{ - return this->head_ == 0; -} - -ACE_INLINE int TAO_Leader_Follower::follower_available (void) const { - return !this->follower_set_.is_empty (); + return !this->follower_set_.empty (); } ACE_INLINE int @@ -45,9 +39,7 @@ TAO_Leader_Follower::elect_new_leader (void) } else if (this->follower_available ()) { - TAO_SYNCH_CONDITION* condition_ptr = this->get_next_follower (); - if (condition_ptr == 0 || condition_ptr->signal () == -1) - return -1; + return this->elect_new_leader_i (); } } return 0; @@ -161,16 +153,16 @@ TAO_Leader_Follower::is_client_leader_thread (void) const return tss->client_leader_thread_ != 0; } -ACE_INLINE int -TAO_Leader_Follower::add_follower (TAO_Follower_Node *follower_node) +ACE_INLINE void +TAO_Leader_Follower::add_follower (TAO_Follower *follower) { - return this->follower_set_.insert (follower_node); + this->follower_set_.push_back (follower); } -ACE_INLINE int -TAO_Leader_Follower::remove_follower (TAO_Follower_Node *follower_node) +ACE_INLINE void +TAO_Leader_Follower::remove_follower (TAO_Follower *follower) { - return this->follower_set_.remove (follower_node); + this->follower_set_.remove (follower); } ACE_INLINE ACE_Reverse_Lock<TAO_SYNCH_MUTEX> & @@ -185,7 +177,7 @@ TAO_Leader_Follower::has_clients (void) const return this->clients_; } - +// **************************************************************** ACE_INLINE TAO_LF_Client_Thread_Helper::TAO_LF_Client_Thread_Helper (TAO_Leader_Follower &leader_follower) @@ -212,30 +204,3 @@ TAO_LF_Client_Leader_Thread_Helper::~TAO_LF_Client_Leader_Thread_Helper (void) { this->leader_follower_.reset_client_leader_thread (); } - -ACE_INLINE int -TAO_LF_Event_Loop_Thread_Helper::set_event_loop_thread (ACE_Time_Value *max_wait_time) -{ - int result = this->lf_strategy_.set_event_loop_thread (max_wait_time, leader_follower_); - - if (result == 0) - this->call_reset_ = 1; - - return result; -} - -ACE_INLINE -TAO_LF_Event_Loop_Thread_Helper::TAO_LF_Event_Loop_Thread_Helper (TAO_Leader_Follower &leader_follower, - TAO_LF_Strategy &lf_strategy) - : leader_follower_ (leader_follower), - lf_strategy_ (lf_strategy), - call_reset_ (0) -{ -} - -ACE_INLINE -TAO_LF_Event_Loop_Thread_Helper::~TAO_LF_Event_Loop_Thread_Helper (void) -{ - this->lf_strategy_.reset_event_loop_thread_and_elect_new_leader (this->call_reset_, - this->leader_follower_); -} diff --git a/TAO/tao/Leader_Follower_Flushing_Strategy.cpp b/TAO/tao/Leader_Follower_Flushing_Strategy.cpp new file mode 100644 index 00000000000..152ae60c0d7 --- /dev/null +++ b/TAO/tao/Leader_Follower_Flushing_Strategy.cpp @@ -0,0 +1,61 @@ +// -*- C++ -*- +// $Id$ + +#include "tao/Leader_Follower_Flushing_Strategy.h" +#include "tao/Leader_Follower.h" +#include "tao/Transport.h" +#include "tao/ORB_Core.h" +#include "tao/Queued_Message.h" +#include "tao/debug.h" + +ACE_RCSID(tao, Leader_Follower_Flushing_Strategy, "$Id$") + +int +TAO_Leader_Follower_Flushing_Strategy::schedule_output (TAO_Transport *transport) +{ + return transport->schedule_output_i (); +} + +int +TAO_Leader_Follower_Flushing_Strategy::cancel_output (TAO_Transport *transport) +{ + return transport->cancel_output_i (); +} + +int +TAO_Leader_Follower_Flushing_Strategy::flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time) +{ + TAO_Leader_Follower &leader_follower = + transport->orb_core ()->leader_follower (); + return leader_follower.wait_for_event (msg, transport, max_wait_time); +} + +int +TAO_Leader_Follower_Flushing_Strategy::flush_transport (TAO_Transport *transport) +{ + // @todo This is not the right way to do this.... + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + TAO_ORB_Core *orb_core = transport->orb_core (); + + while (!transport->queue_is_empty ()) + { + int result = orb_core->run (0, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (result == -1) + return -1; + } + } + ACE_CATCHANY + { + return -1; + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/tao/Leader_Follower_Flushing_Strategy.h b/TAO/tao/Leader_Follower_Flushing_Strategy.h new file mode 100644 index 00000000000..4790f65b06a --- /dev/null +++ b/TAO/tao/Leader_Follower_Flushing_Strategy.h @@ -0,0 +1,41 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Leader_Follower_Flushing_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_LEADER_FOLLOWER_FLUSHING_STRATEGY_H +#define TAO_LEADER_FOLLOWER_FLUSHING_STRATEGY_H +#include "ace/pre.h" + +#include "Flushing_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Leader_Follower_Flushing_Strategy + * + * @brief Implement a flushing strategy that uses the Leader/Follower + * set. + */ +class TAO_Export TAO_Leader_Follower_Flushing_Strategy : public TAO_Flushing_Strategy +{ +public: + virtual int schedule_output (TAO_Transport *transport); + virtual int cancel_output (TAO_Transport *transport); + virtual int flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time); + virtual int flush_transport (TAO_Transport *transport); +}; + +#include "ace/post.h" +#endif /* TAO_LEADER_FOLLOWER_FLUSHING_STRATEGY_H */ diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile index 9b9dd8ea637..385949cdb28 100644 --- a/TAO/tao/Makefile +++ b/TAO/tao/Makefile @@ -47,7 +47,13 @@ PUB_HDRS = \ Client_Strategy_Factory \ ORB_Core \ ORB_Table \ + Follower \ + LF_Event \ + LF_Event_Loop_Thread_Helper \ + LF_Strategy \ + LF_Strategy_Complete \ Leader_Follower \ + Leader_Follower_Flushing_Strategy \ Reactor_Holder \ Single_Reactor \ Wait_Strategy \ @@ -185,7 +191,14 @@ ORB_CORE_FILES = \ ORB_Core \ Stub_Factory \ ORB_Table \ + Follower \ + Follower_Auto_Ptr \ Leader_Follower \ + Leader_Follower_Flushing_Strategy \ + LF_Event \ + LF_Event_Loop_Thread_Helper \ + LF_Strategy \ + LF_Strategy_Complete \ Reactor_Registry \ params \ Resource_Factory \ diff --git a/TAO/tao/Makefile.bor b/TAO/tao/Makefile.bor index 9011376eef6..71d1f8939c6 100644 --- a/TAO/tao/Makefile.bor +++ b/TAO/tao/Makefile.bor @@ -77,6 +77,8 @@ OBJFILES = \ $(OBJDIR)\Fault_Tolerance_Service.obj \ $(OBJDIR)\FILE_Parser.obj \ $(OBJDIR)\Flushing_Strategy.obj \ + $(OBJDIR)\Follower.obj \ + $(OBJDIR)\Follower_Auto_Ptr.obj \ $(OBJDIR)\GIOP_Message_Base.obj \ $(OBJDIR)\GIOP_Message_Lite.obj \ $(OBJDIR)\GIOP_Message_Generator_Parser.obj \ @@ -108,7 +110,12 @@ OBJFILES = \ $(OBJDIR)\IOPC.obj \ $(OBJDIR)\IOR_Parser.obj \ $(OBJDIR)\IORInfo.obj \ + $(OBJDIR)\LF_Event.obj \ + $(OBJDIR)\LF_Event_Loop_Thread_Helper.obj \ + $(OBJDIR)\LF_Strategy.obj \ + $(OBJDIR)\LF_Strategy_Complete.obj \ $(OBJDIR)\Leader_Follower.obj \ + $(OBJDIR)\Leader_Follower_Flushing_Strategy.obj \ $(OBJDIR)\LocalObject.obj \ $(OBJDIR)\LRU_Connection_Purging_Strategy.obj \ $(OBJDIR)\Managed_Types.obj \ diff --git a/TAO/tao/Muxed_TMS.cpp b/TAO/tao/Muxed_TMS.cpp index 1bd476a279e..1975af86fb9 100644 --- a/TAO/tao/Muxed_TMS.cpp +++ b/TAO/tao/Muxed_TMS.cpp @@ -51,8 +51,7 @@ TAO_Muxed_TMS::bind_dispatcher (CORBA::ULong request_id, return -1; } - return TAO_Transport_Mux_Strategy::bind_dispatcher (request_id, - rd); + return 0; } void diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 52a9996365b..38e20e95b8e 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -23,10 +23,6 @@ #include "ObjectIDList.h" -#include "ace/Object_Manager.h" -#include "ace/Env_Value_T.h" -#include "ace/Dynamic_Service.h" -#include "ace/Arg_Shifter.h" #include "Services_Activate.h" #include "Invocation.h" #include "BiDir_Adapter.h" @@ -44,6 +40,14 @@ #if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) # include "Buffering_Constraint_Policy.h" #endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */ + +#include "tao/LF_Event_Loop_Thread_Helper.h" + +#include "ace/Object_Manager.h" +#include "ace/Env_Value_T.h" +#include "ace/Dynamic_Service.h" +#include "ace/Arg_Shifter.h" + #if defined(ACE_MVS) #include "ace/Codeset_IBM1047.h" #endif /* ACE_MVS */ @@ -1783,25 +1787,6 @@ TAO_ORB_Core::poa_adapter (void) return this->poa_adapter_; } -TAO_SYNCH_CONDITION * -TAO_ORB_Core::leader_follower_condition_variable (void) -{ - // Always using TSS. - - // Get tss key. - TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources (); - - if (tss->leader_follower_condition_variable_ == 0) - { - // Create a new one and return. - ACE_NEW_RETURN (tss->leader_follower_condition_variable_, - TAO_SYNCH_CONDITION (this->leader_follower ().lock ()), - 0); - } - - return tss->leader_follower_condition_variable_; -} - TAO_Stub * TAO_ORB_Core::create_stub(const char *repository_id, const TAO_MProfile &profiles, @@ -2100,10 +2085,11 @@ TAO_ORB_Core::run (ACE_Time_Value *tv, TAO_LF_Strategy &lf_strategy = this->lf_strategy (); - TAO_LF_Event_Loop_Thread_Helper helper (leader_follower, lf_strategy); - - result = helper.set_event_loop_thread (tv); + TAO_LF_Event_Loop_Thread_Helper helper (leader_follower, + lf_strategy, + tv); + int result = helper.event_loop_return (); if (result != 0) { if (errno == ETIME) @@ -3107,7 +3093,6 @@ TAO_ORB_Core_TSS_Resources::TAO_ORB_Core_TSS_Resources (void) transport_cache_ (0), event_loop_thread_ (0), client_leader_thread_ (0), - leader_follower_condition_variable_ (0), reactor_registry_ (0), reactor_registry_cookie_ (0), ts_objects_ (), @@ -3140,9 +3125,6 @@ TAO_ORB_Core_TSS_Resources::~TAO_ORB_Core_TSS_Resources (void) // UNIMPLEMENTED delete this->transport_cache__; this->transport_cache_ = 0; - delete this->leader_follower_condition_variable_; - this->leader_follower_condition_variable_ = 0; - if (this->reactor_registry_ != 0) this->reactor_registry_->destroy_tss_cookie ( this->reactor_registry_cookie_); diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index fa2c5b022e0..0eedb77f8fb 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -152,9 +152,6 @@ public: /// leader. int client_leader_thread_; - /// Condition variable for the leader follower model. - TAO_SYNCH_CONDITION* leader_follower_condition_variable_; - /// The Reactor Holder that we should callback when destroying the /// cookie. TAO_Reactor_Registry *reactor_registry_; @@ -669,10 +666,6 @@ public: /// threads block forever. int thread_per_connection_timeout (ACE_Time_Value &timeout) const; - /// Condition variable used in the Leader Follower Wait Strategy, on - /// which the follower thread blocks. - TAO_SYNCH_CONDITION* leader_follower_condition_variable (void); - /// Makes sure that the ORB is open and then creates a TAO_Stub /// based on the endpoint. TAO_Stub *create_stub_object (const TAO_ObjectKey &key, diff --git a/TAO/tao/ORB_Core.i b/TAO/tao/ORB_Core.i index c06290ef6ed..90e3ec6918a 100644 --- a/TAO/tao/ORB_Core.i +++ b/TAO/tao/ORB_Core.i @@ -552,7 +552,7 @@ TAO_ORB_Core::resolve_rt_orb (CORBA::Environment &ACE_TRY_ENV) if (CORBA::is_nil (this->rt_orb_.in ())) { // Save a reference to the priority mapping manager. - this->rt_orb_ = + this->rt_orb_ = this->object_ref_table ().resolve_initial_references ( TAO_OBJID_RTORB, ACE_TRY_ENV); @@ -573,7 +573,7 @@ TAO_ORB_Core::resolve_rt_current (CORBA::Environment &ACE_TRY_ENV) if (CORBA::is_nil (this->rt_current_.in ())) { // Save a reference to the priority mapping manager. - this->rt_current_ = + this->rt_current_ = this->object_ref_table ().resolve_initial_references ( TAO_OBJID_RTCURRENT, ACE_TRY_ENV); diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index 1cd30ae2e2e..68d308b2eeb 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -10,10 +10,7 @@ ACE_RCSID(tao, Queued_Message, "$Id$") TAO_Queued_Message::TAO_Queued_Message (void) - : connection_closed_ (0) - , send_failure_ (0) - , timeout_ (0) - , next_ (0) + : next_ (0) , prev_ (0) { } @@ -23,24 +20,6 @@ TAO_Queued_Message::~TAO_Queued_Message (void) } void -TAO_Queued_Message::connection_closed (void) -{ - this->connection_closed_ = 1; -} - -void -TAO_Queued_Message::send_failure (void) -{ - this->send_failure_ = 1; -} - -void -TAO_Queued_Message::timeout (void) -{ - this->timeout_ = 1; -} - -void TAO_Queued_Message::remove_from_list (TAO_Queued_Message *&head, TAO_Queued_Message *&tail) { diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index 11eb9861bfc..591aeb01b11 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -15,6 +15,7 @@ #include "ace/pre.h" #include "corbafwd.h" +#include "LF_Event.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -60,7 +61,7 @@ class ACE_Message_Block; * allocating the memory? * */ -class TAO_Export TAO_Queued_Message +class TAO_Export TAO_Queued_Message : public TAO_LF_Event { public: /// Constructor @@ -69,16 +70,6 @@ public: /// Destructor virtual ~TAO_Queued_Message (void); - /// The underlying connection has been closed, release resources and - /// signal waiting threads. - void connection_closed (void); - - /// There was an error while sending the data. - void send_failure (void); - - /// There was a timeout while sending the data - void timeout (void); - /** @name Intrusive list manipulation * * The messages are put in a doubled linked list (for easy insertion @@ -179,16 +170,6 @@ public: virtual void destroy (void) = 0; //@} -protected: - /// Set to 1 if the connection was closed - int connection_closed_; - - /// Set to 1 if there was a failure while sending the data - int send_failure_; - - /// Set to 1 if there was a timeout while sending the data - int timeout_; - private: /// Implement an intrusive double-linked list for the message queue TAO_Queued_Message *next_; diff --git a/TAO/tao/RTPortableServer/TAO_RTPortableServer.dsp b/TAO/tao/RTPortableServer/TAO_RTPortableServer.dsp index cbaa4f3f5ea..9e62aa13fe5 100644 --- a/TAO/tao/RTPortableServer/TAO_RTPortableServer.dsp +++ b/TAO/tao/RTPortableServer/TAO_RTPortableServer.dsp @@ -56,7 +56,7 @@ BSC32=bscmake.exe # ADD BSC32 /nologo
LINK32=link.exe
# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /machine:I386
-# ADD LINK32 ace.lib TAO.lib TAO_PortableServer.lib /nologo /dll /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServer.dll" /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer"
+# ADD LINK32 ace.lib TAO.lib TAO_PortableServer.lib TAO_RTCORBA.lib /nologo /dll /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServer.dll" /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer"
!ELSEIF "$(CFG)" == "RTPortableServer - Win32 Debug"
@@ -83,7 +83,7 @@ BSC32=bscmake.exe # ADD BSC32 /nologo
LINK32=link.exe
# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /debug /machine:I386 /pdbtype:sept
-# ADD LINK32 TAOd.lib aced.lib TAO_PortableServerd.lib /nologo /dll /debug /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServerd.dll" /pdbtype:sept /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer"
+# ADD LINK32 TAOd.lib aced.lib TAO_PortableServerd.lib TAO_RTCORBAd.lib /nologo /dll /debug /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServerd.dll" /pdbtype:sept /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer" /libpath:"..\..\tao\RTCORBA"
!ELSEIF "$(CFG)" == "RTPortableServer - Win32 MFC Debug"
@@ -112,7 +112,7 @@ BSC32=bscmake.exe # ADD BSC32 /nologo
LINK32=link.exe
# ADD BASE LINK32 TAOd.lib aced.lib TAO_Svc_Utilsd.lib TAO_RTEventd.lib TAO_RTSchedd.lib /nologo /dll /debug /machine:I386 /out:"..\..\..\bin\TAO_RTPORTABLESERVERd.dll" /pdbtype:sept /libpath:"..\..\tao" /libpath:"..\..\..\ace"
-# ADD LINK32 TAOmfcd.lib acemfcd.lib TAO_PortableServermfcd.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /debug /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServermfcd.dll" /pdbtype:sept /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer"
+# ADD LINK32 TAO_RTCORBAmfcd.lib TAOmfcd.lib acemfcd.lib TAO_PortableServermfcd.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /debug /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServermfcd.dll" /pdbtype:sept /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer"
!ELSEIF "$(CFG)" == "RTPortableServer - Win32 MFC Release"
@@ -141,7 +141,7 @@ BSC32=bscmake.exe # ADD BSC32 /nologo
LINK32=link.exe
# ADD BASE LINK32 ace.lib TAO.lib TAO_Svc_Utils.lib TAO_RTEvent.lib TAO_RTSched.lib /nologo /dll /machine:I386 /out:"..\..\..\bin\TAO_RTPORTABLESERVER.dll" /libpath:"..\..\tao" /libpath:"..\..\..\ace"
-# ADD LINK32 TAOmfc.lib acemfc.lib TAO_PortableServermfc.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServermfc.dll" /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer"
+# ADD LINK32 TAO_RTCORBAmfc.lib TAOmfc.lib acemfc.lib TAO_PortableServermfc.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /machine:I386 /out:"..\..\..\bin\TAO_RTPortableServermfc.dll" /libpath:"..\..\tao" /libpath:"..\..\..\ace" /libpath:"..\..\tao\PortableServer"
!ENDIF
diff --git a/TAO/tao/Reactor_Registry.cpp b/TAO/tao/Reactor_Registry.cpp index b5867a2bbc3..f707ca1e1bf 100644 --- a/TAO/tao/Reactor_Registry.cpp +++ b/TAO/tao/Reactor_Registry.cpp @@ -3,6 +3,7 @@ #include "tao/Reactor_Registry.h" #include "tao/ORB_Core.h" #include "tao/Leader_Follower.h" +#include "tao/LF_Strategy.h" #if !defined (__ACE_INLINE__) # include "tao/Reactor_Registry.i" diff --git a/TAO/tao/Reply_Dispatcher.h b/TAO/tao/Reply_Dispatcher.h index ad6ffd4c516..67df4e5d8df 100644 --- a/TAO/tao/Reply_Dispatcher.h +++ b/TAO/tao/Reply_Dispatcher.h @@ -76,13 +76,6 @@ public: // Get the Message State into which the reply has been read. /** - * The dispatcher has been bound. - * Some dispatchers need to retain state to cooperate with other - * components, such as the waiting strategy. - */ - virtual void dispatcher_bound (TAO_Transport*) = 0; - - /** * The used for the pending reply has been closed. * No reply is expected. * @@ TODO: If the connection was closed due to a CloseConnection diff --git a/TAO/tao/Strategies/LF_Strategy_Null.cpp b/TAO/tao/Strategies/LF_Strategy_Null.cpp new file mode 100644 index 00000000000..d3c740cb9de --- /dev/null +++ b/TAO/tao/Strategies/LF_Strategy_Null.cpp @@ -0,0 +1,32 @@ +// -*- C++ -*- +// $Id$ + +#include "LF_Strategy_Null.h" + +#if !defined (__ACE_INLINE__) +# include "LF_Strategy_Null.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, LF_Strategy_Null, "$Id$") + +TAO_LF_Strategy_Null::~TAO_LF_Strategy_Null (void) +{ +} + +void +TAO_LF_Strategy_Null::set_upcall_thread (TAO_Leader_Follower &) +{ +} + +int +TAO_LF_Strategy_Null::set_event_loop_thread (ACE_Time_Value *, + TAO_Leader_Follower &) +{ + return 0; +} + +void +TAO_LF_Strategy_Null::reset_event_loop_thread (int, + TAO_Leader_Follower &) +{ +} diff --git a/TAO/tao/Strategies/LF_Strategy_Null.h b/TAO/tao/Strategies/LF_Strategy_Null.h new file mode 100644 index 00000000000..a9d54a5348a --- /dev/null +++ b/TAO/tao/Strategies/LF_Strategy_Null.h @@ -0,0 +1,53 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file LF_Strategy_Null.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_LF_STRATEGY_NULL_H +#define TAO_LF_STRATEGY_NULL_H +#include "ace/pre.h" + +#include "strategies_export.h" +#include "tao/LF_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @brief A concrete TAO_LF_Strategy for ORB configurations that do + * not use the Leader/Followers event loop. + */ +class TAO_Strategies_Export TAO_LF_Strategy_Null : public TAO_LF_Strategy +{ +public: + /// Constructor + TAO_LF_Strategy_Null (void); + + //@{ + /** @name Virtual Methods + * + * Please check the documentation in TAO_LF_Strategy + */ + virtual ~TAO_LF_Strategy_Null (void); + + virtual void set_upcall_thread (TAO_Leader_Follower &); + virtual int set_event_loop_thread (ACE_Time_Value *max_wait_time, + TAO_Leader_Follower &); + virtual void reset_event_loop_thread (int call_reset, + TAO_Leader_Follower &); +}; + +#if defined (__ACE_INLINE__) +# include "LF_Strategy_Null.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_LF_STRATEGY_NULL_H */ diff --git a/TAO/tao/Strategies/LF_Strategy_Null.inl b/TAO/tao/Strategies/LF_Strategy_Null.inl new file mode 100644 index 00000000000..c073be304e7 --- /dev/null +++ b/TAO/tao/Strategies/LF_Strategy_Null.inl @@ -0,0 +1,6 @@ +// $Id$ + +ACE_INLINE +TAO_LF_Strategy_Null::TAO_LF_Strategy_Null (void) +{ +} diff --git a/TAO/tao/Strategies/Makefile b/TAO/tao/Strategies/Makefile index 671149799a7..115826e059c 100644 --- a/TAO/tao/Strategies/Makefile +++ b/TAO/tao/Strategies/Makefile @@ -52,7 +52,8 @@ CPP_SRCS += \ Reactor_Per_Priority \ LFU_Connection_Purging_Strategy \ FIFO_Connection_Purging_Strategy \ - NULL_Connection_Purging_Strategy + NULL_Connection_Purging_Strategy \ + LF_Strategy_Null IDL_SRC = \ $(addsuffix S.cpp, $(IDL_FILES)) \ diff --git a/TAO/tao/Strategies/Makefile.bor b/TAO/tao/Strategies/Makefile.bor index 83338364c48..30599f4f7ac 100644 --- a/TAO/tao/Strategies/Makefile.bor +++ b/TAO/tao/Strategies/Makefile.bor @@ -33,7 +33,8 @@ OBJFILES = \ $(OBJDIR)\Reactor_Per_Priority.obj \ $(OBJDIR)\FIFO_Connection_Purging_Strategy.obj \ $(OBJDIR)\LFU_Connection_Purging_Strategy.obj \ - $(OBJDIR)\NULL_Connection_Purging_Strategy.obj + $(OBJDIR)\NULL_Connection_Purging_Strategy.obj \ + $(OBJDIR)\LF_Strategy_Null.obj !ifdef STATIC CFLAGS = $(ACE_CFLAGS) $(TAO_CFLAGS) $(TAO_STRATEGIES_CFLAGS) diff --git a/TAO/tao/Strategies/TAO_Strategies.dsp b/TAO/tao/Strategies/TAO_Strategies.dsp index f2c7e384275..975147e7a47 100644 --- a/TAO/tao/Strategies/TAO_Strategies.dsp +++ b/TAO/tao/Strategies/TAO_Strategies.dsp @@ -130,6 +130,10 @@ SOURCE=.\FIFO_Connection_Purging_Strategy.cpp # End Source File
# Begin Source File
+SOURCE=.\LF_Strategy_Null.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\LFU_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
@@ -246,6 +250,14 @@ SOURCE=.\DIOP_Transport.h # End Source File
# Begin Source File
+SOURCE=.\LF_Strategy_Null.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Null.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Per_Priority.h
# End Source File
# Begin Source File
diff --git a/TAO/tao/Strategies/TAO_Strategies_Static.dsp b/TAO/tao/Strategies/TAO_Strategies_Static.dsp index 5402fc26f2a..24e09b71cb9 100644 --- a/TAO/tao/Strategies/TAO_Strategies_Static.dsp +++ b/TAO/tao/Strategies/TAO_Strategies_Static.dsp @@ -127,6 +127,10 @@ SOURCE=.\FIFO_Connection_Purging_Strategy.cpp # End Source File
# Begin Source File
+SOURCE=.\LF_Strategy_Null.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\LFU_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
@@ -243,6 +247,14 @@ SOURCE=.\DIOP_Transport.h # End Source File
# Begin Source File
+SOURCE=.\LF_Strategy_Null.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Null.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Per_Priority.h
# End Source File
# Begin Source File
diff --git a/TAO/tao/Strategies/advanced_resource.cpp b/TAO/tao/Strategies/advanced_resource.cpp index 76f962f8732..04bc164a7bb 100644 --- a/TAO/tao/Strategies/advanced_resource.cpp +++ b/TAO/tao/Strategies/advanced_resource.cpp @@ -15,9 +15,12 @@ #include "FIFO_Connection_Purging_Strategy.h" #include "NULL_Connection_Purging_Strategy.h" +#include "LF_Strategy_Null.h" + #include "tao/debug.h" #include "tao/Single_Reactor.h" #include "tao/LRU_Connection_Purging_Strategy.h" +#include "tao/Complete_LF_Strategy.h" #include "tao/Leader_Follower.h" #include "tao/StringSeqC.h" @@ -82,7 +85,7 @@ TAO_Advanced_Resource_Factory::init (int argc, char **argv) return 0; } this->options_processed_ = 1; - + // If the default resource factory exists, then disable it. // This causes any directives for the "Resource_Factory" to // report warnings. @@ -101,7 +104,7 @@ TAO_Advanced_Resource_Factory::init (int argc, char **argv) unused_argv.length (argc); for (curarg = 0; curarg < argc; curarg++) - { + { if (ACE_OS::strcasecmp (argv[curarg], "-ORBReactorRegistry") == 0) { @@ -222,10 +225,10 @@ TAO_Advanced_Resource_Factory::init (int argc, char **argv) unused_argc++; } } - + unused_argv.length (unused_argc); // "Trim" the string sequence to // the actual size. - + this->TAO_Default_Resource_Factory::init (unused_argc, unused_argv.get_buffer ()); return 0; @@ -711,13 +714,13 @@ TAO_Advanced_Resource_Factory::create_lf_strategy (void) if (this->reactor_type_ == TAO_REACTOR_SELECT_ST) { ACE_NEW_RETURN (strategy, - TAO_Null_LF_Strategy, + TAO_LF_Strategy_Null, 0); } else { ACE_NEW_RETURN (strategy, - TAO_Complete_LF_Strategy, + TAO_LF_Strategy_Complete, 0); } return strategy; diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp index a76d964115c..8381449928d 100644 --- a/TAO/tao/Synch_Queued_Message.cpp +++ b/TAO/tao/Synch_Queued_Message.cpp @@ -65,6 +65,8 @@ TAO_Synch_Queued_Message::fill_iov (int iovcnt_max, void TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count) { + this->state_changed_i (TAO_LF_Event::LFS_ACTIVE); + while (this->current_block_ != 0 && byte_count > 0) { size_t l = this->current_block_->length (); @@ -84,6 +86,8 @@ TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count) this->current_block_ = this->current_block_->cont (); } } + if (this->current_block_ == 0) + this->state_changed (TAO_LF_Event::LFS_SUCCESS); } void diff --git a/TAO/tao/Synch_Reply_Dispatcher.cpp b/TAO/tao/Synch_Reply_Dispatcher.cpp index 609fa947b29..752e7143295 100644 --- a/TAO/tao/Synch_Reply_Dispatcher.cpp +++ b/TAO/tao/Synch_Reply_Dispatcher.cpp @@ -2,9 +2,8 @@ #include "tao/Synch_Reply_Dispatcher.h" #include "tao/ORB_Core.h" -#include "tao/Wait_Strategy.h" #include "tao/Pluggable_Messaging_Utils.h" -#include "Transport.h" +#include "tao/Transport.h" ACE_RCSID(tao, Synch_Reply_Dispatcher, "$Id$") @@ -17,7 +16,6 @@ TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher ( : reply_service_info_ (sc), reply_received_ (0), orb_core_ (orb_core), - wait_strategy_ (0), db_ (sizeof buf_, ACE_Message_Block::MB_DATA, this->buf_, @@ -30,9 +28,10 @@ TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher ( TAO_ENCAP_BYTE_ORDER, TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR, - orb_core), - leader_follower_condition_variable_ (0) + orb_core) { + // As a TAO_LF_Event we start in the active state.... + this->state_changed_i (TAO_LF_Event::LFS_ACTIVE); } // Destructor. @@ -70,42 +69,23 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply ( //this->message_state_.reset (0); // Transfer the <params.input_cdr_>'s content to this->reply_cdr_ + // @@ Somebody could please explain why we ignore the value + // returned? And why don't we simply use the normal C++ idioms to + // represent that? Namely: + // (void) this->reply_cdr_.close_from (params.input_cdr_); + // ACE_Data_Block *db = this->reply_cdr_.clone_from (params.input_cdr_); ACE_UNUSED_ARG (db); - if (this->wait_strategy_ != 0) - { - if (this->wait_strategy_->reply_dispatched ( - this->reply_received_, - this->leader_follower_condition_variable_ - ) - == -1) - { - return -1; - } - } + this->state_changed (TAO_LF_Event::LFS_SUCCESS); return 1; } void -TAO_Synch_Reply_Dispatcher::dispatcher_bound (TAO_Transport *transport) -{ - this->wait_strategy_ = transport->wait_strategy (); - this->leader_follower_condition_variable_ = - transport->wait_strategy ()->leader_follower_condition_variable (); -} - -void TAO_Synch_Reply_Dispatcher::connection_closed (void) { - if (this->wait_strategy_ != 0) - { - this->wait_strategy_->connection_closed ( - this->reply_received_, - this->leader_follower_condition_variable_ - ); - } + this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); } diff --git a/TAO/tao/Synch_Reply_Dispatcher.h b/TAO/tao/Synch_Reply_Dispatcher.h index ec6aabe157b..e8ca72afc64 100644 --- a/TAO/tao/Synch_Reply_Dispatcher.h +++ b/TAO/tao/Synch_Reply_Dispatcher.h @@ -20,13 +20,13 @@ #include "ace/pre.h" #include "tao/Reply_Dispatcher.h" +#include "tao/LF_Event.h" #include "tao/GIOP_Message_Version.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -class TAO_Wait_Strategy; class TAO_Pluggable_Reply_Params; /** @@ -35,7 +35,9 @@ class TAO_Pluggable_Reply_Params; * @brief Reply dispatcher for Synchoronous Method Invocation (SMI)s. * */ -class TAO_Export TAO_Synch_Reply_Dispatcher : public TAO_Reply_Dispatcher +class TAO_Export TAO_Synch_Reply_Dispatcher + : public TAO_Reply_Dispatcher + , public TAO_LF_Event { public: @@ -54,11 +56,6 @@ public: virtual int dispatch_reply (TAO_Pluggable_Reply_Params ¶ms); - // Commented for the time being - Bala - // virtual TAO_GIOP_Message_State *message_state (void); - - virtual void dispatcher_bound (TAO_Transport *); - virtual void connection_closed (void); protected: @@ -72,10 +69,6 @@ private: /// Cache the ORB Core pointer. TAO_ORB_Core *orb_core_; - /// Save the wait strategy to signal the waiting threads (if - /// appropriate). - TAO_Wait_Strategy *wait_strategy_; - /* @@todo: At some point of time we are going to get to a situation where TAO has huge stack sizes. Need to think on how we would deal with that. One idea would be to push these things on TSS as @@ -93,14 +86,6 @@ private: /// CDR stream which has the reply information that needs to be /// demarshalled by the stubs TAO_InputCDR reply_cdr_; - - /** - * The condition variable used to signal the waiting thread in the - * Leader/Followers model. The variable is acquired in the thread - * that binds the Reply_Dispatcher to its transport, and then passed - * to the Waiting_Strategy to do the signalling, if needed. - */ - TAO_SYNCH_CONDITION *leader_follower_condition_variable_; }; #include "ace/post.h" diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index b319103ab57..63e23ad34fa 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -387,6 +387,14 @@ SOURCE=.\Flushing_Strategy.cpp # End Source File
# Begin Source File
+SOURCE=.\Follower.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\Follower_Auto_Ptr.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Base.cpp
# End Source File
# Begin Source File
@@ -515,6 +523,18 @@ SOURCE=.\Leader_Follower.cpp # End Source File
# Begin Source File
+SOURCE=.\Leader_Follower_Flushing_Strategy.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event_Loop_Thread_Helper.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\LocalObject.cpp
# End Source File
# Begin Source File
@@ -1119,6 +1139,22 @@ SOURCE=.\Flushing_Strategy.h # End Source File
# Begin Source File
+SOURCE=.\Follower.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Follower.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\Follower_Auto_Ptr.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Follower_Auto_Ptr.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\giop.h
# End Source File
# Begin Source File
@@ -1267,6 +1303,26 @@ SOURCE=.\Leader_Follower.h # End Source File
# Begin Source File
+SOURCE=.\Leader_Follower_Flushing_Strategy.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event_Loop_Thread_Helper.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event_Loop_Thread_Helper.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Linear_Priority_Mapping.h
# End Source File
# Begin Source File
@@ -1963,6 +2019,30 @@ SOURCE=.\Leader_Follower.i # End Source File
# Begin Source File
+SOURCE=.\LF_Event.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Complete.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Complete.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Complete.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Linear_Priority_Mapping.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO.dsw b/TAO/tao/TAO.dsw index 33302b448de..6def02f39b4 100644 --- a/TAO/tao/TAO.dsw +++ b/TAO/tao/TAO.dsw @@ -219,6 +219,66 @@ Package=<4> ###############################################################################
+Project: "RTCORBA"=.\RTCORBA\TAO_RTCORBA.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+ Begin Project Dependency
+ Project_Dep_Name TAO DLL
+ End Project Dependency
+}}}
+
+###############################################################################
+
+Project: "RTCORBA_Static"=.\RTCORBA\TAO_RTCORBA_Static.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
+Project: "RTPortableServer"=.\RTPortableServer\TAO_RTPortableServer.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+ Begin Project Dependency
+ Project_Dep_Name PortableServer
+ End Project Dependency
+ Begin Project Dependency
+ Project_Dep_Name TAO DLL
+ End Project Dependency
+ Begin Project Dependency
+ Project_Dep_Name RTCORBA
+ End Project Dependency
+}}}
+
+###############################################################################
+
+Project: "RTPortableServer_Static"=.\RTPortableServer\TAO_RTPortableServer_Static.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Project: "SmartProxies"=.\SmartProxies\SmartProxies.dsp - Package Owner=<4>
Package=<5>
@@ -291,7 +351,7 @@ Package=<4> ###############################################################################
-Project: "TypeCodeFactory_Static"=.\TYPECODEFACTORY\TypeCodeFactory_Static\TypeCodeFactory_Static.dsp - Package Owner=<4>
+Project: "TypeCodeFactory_Static"=H:\fix_886\ACE_wrappers\TAO\tao\TypeCodeFactory\TypeCodeFactory_Static.dsp - Package Owner=<4>
Package=<5>
{{{
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp index 5353de0a536..4ace1c229eb 100644 --- a/TAO/tao/TAO_Static.dsp +++ b/TAO/tao/TAO_Static.dsp @@ -40,8 +40,8 @@ RSC=rc.exe # PROP Output_Dir ""
# PROP Intermediate_Dir "LIB\Release"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /YX /FD /c
# ADD CPP /nologo /MD /W3 /GX /O2 /I "../../" /I "../" /D "_WINDOWS" /D "_CONSOLE" /D "NDEBUG" /D "WIN32" /D "TAO_AS_STATIC_LIBS" /D "ACE_AS_STATIC_LIBS" /FD /c
# SUBTRACT CPP /YX
@@ -66,8 +66,8 @@ LIB32=link.exe -lib # PROP Output_Dir ""
# PROP Intermediate_Dir "LIB\Debug"
# PROP Target_Dir ""
-MTL=midl.exe
LINK32=link.exe -lib
+MTL=midl.exe
# ADD BASE CPP /nologo /W3 /GX /Z7 /Od /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /YX /FD /c
# ADD CPP /nologo /MDd /W3 /GX /Zi /Od /I "../../" /I "../" /D "_WINDOWS" /D "_CONSOLE" /D "_DEBUG" /D "WIN32" /D "ACE_AS_STATIC_LIBS" /D "TAO_AS_STATIC_LIBS" /FD /c
# SUBTRACT CPP /YX
@@ -467,6 +467,22 @@ SOURCE=.\Leader_Follower.h # End Source File
# Begin Source File
+SOURCE=.\LF_Event.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event_Loop_Thread_Helper.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Complete.h
+# End Source File
+# Begin Source File
+
SOURCE=.\LocalObject.h
# End Source File
# Begin Source File
@@ -1151,6 +1167,22 @@ SOURCE=.\Leader_Follower.i # End Source File
# Begin Source File
+SOURCE=.\LF_Event.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event_Loop_Thread_Helper.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Complete.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\LocalObject.i
# End Source File
# Begin Source File
@@ -1787,6 +1819,22 @@ SOURCE=.\Leader_Follower.cpp # End Source File
# Begin Source File
+SOURCE=.\LF_Event.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Event_Loop_Thread_Helper.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\LF_Strategy_Complete.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\LocalObject.cpp
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 020affe6c9a..fe458896574 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -107,7 +107,7 @@ TAO_Transport::~TAO_Transport (void) { // @@ This is a good point to insert a flag to indicate that a // CloseConnection message was successfully received. - i->connection_closed (); + i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); TAO_Queued_Message *tmp = i; i = i->next (); @@ -558,12 +558,6 @@ TAO_Transport::idle_after_reply (void) return this->tms ()->idle_after_reply (); } -TAO_SYNCH_CONDITION * -TAO_Transport::leader_follower_condition_variable (void) -{ - return this->wait_strategy ()->leader_follower_condition_variable (); -} - int TAO_Transport::tear_listen_point_list (TAO_InputCDR &) { @@ -705,7 +699,7 @@ TAO_Transport::close_connection_i (void) for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) { - i->connection_closed (); + i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); } } diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 93e4ee7d7a4..20035d38eb6 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -252,13 +252,6 @@ public: int handle_output (void); /** - * Return the TSS leader follower condition variable used in the - * Wait Strategy. Muxed Leader Follower implementation returns a - * valid condition variable, others return 0. - */ - virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); - - /** * Initialising the messaging object. This would be used by the * connector side. On the acceptor side the connection handler * would take care of the messaging objects. @@ -771,10 +764,11 @@ private: /// A helper routine used in drain_queue_i() int drain_queue_helper (int &iovcnt, iovec iov[]); - /// This class needs privileged access to: + /// These classes need privileged access to: /// - schedule_output_i() /// - cancel_output_i() friend class TAO_Reactive_Flushing_Strategy; + friend class TAO_Leader_Follower_Flushing_Strategy; /// Schedule handle_output() callbacks int schedule_output_i (void); diff --git a/TAO/tao/Transport_Mux_Strategy.cpp b/TAO/tao/Transport_Mux_Strategy.cpp index f6c93ddfa4c..2997e5c70d8 100644 --- a/TAO/tao/Transport_Mux_Strategy.cpp +++ b/TAO/tao/Transport_Mux_Strategy.cpp @@ -13,17 +13,3 @@ TAO_Transport_Mux_Strategy::TAO_Transport_Mux_Strategy (TAO_Transport *transport TAO_Transport_Mux_Strategy::~TAO_Transport_Mux_Strategy (void) { } - - -int -TAO_Transport_Mux_Strategy::bind_dispatcher (CORBA::ULong, - TAO_Reply_Dispatcher *rd) -{ - rd->dispatcher_bound (this->transport_); - return 0; -} - -void -TAO_Transport_Mux_Strategy::unbind_dispatcher (CORBA::ULong) -{ -} diff --git a/TAO/tao/Transport_Mux_Strategy.h b/TAO/tao/Transport_Mux_Strategy.h index 5937ff16e9a..d58f8b0fd11 100644 --- a/TAO/tao/Transport_Mux_Strategy.h +++ b/TAO/tao/Transport_Mux_Strategy.h @@ -53,7 +53,7 @@ public: /// Bind the dispatcher with the request id. Commonalities in the /// derived class implementations is kept here. virtual int bind_dispatcher (CORBA::ULong request_id, - TAO_Reply_Dispatcher *rd); + TAO_Reply_Dispatcher *rd) = 0; /** * Unbind the dispatcher, the client is no longer waiting for the @@ -62,7 +62,7 @@ public: * request. * A later reply for that request should be ignored. */ - virtual void unbind_dispatcher (CORBA::ULong request_id); + virtual void unbind_dispatcher (CORBA::ULong request_id) = 0; /// Dispatch the reply for <request_id>, cleanup any resources /// allocated for that request. diff --git a/TAO/tao/Wait_On_Leader_Follower.cpp b/TAO/tao/Wait_On_Leader_Follower.cpp index d614abbafbc..422bc57b541 100644 --- a/TAO/tao/Wait_On_Leader_Follower.cpp +++ b/TAO/tao/Wait_On_Leader_Follower.cpp @@ -3,8 +3,9 @@ #include "tao/Wait_On_Leader_Follower.h" #include "tao/ORB_Core.h" #include "tao/Leader_Follower.h" +#include "tao/Transport.h" +#include "tao/Synch_Reply_Dispatcher.h" #include "tao/debug.h" -#include "Transport.h" ACE_RCSID(tao, Wait_On_Leader_Follower, "$Id$") @@ -47,340 +48,11 @@ TAO_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, int TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time, - int &reply_received) + TAO_Synch_Reply_Dispatcher &rd) { - // Cache the ORB core, it won't change and is used multiple times - // below: - TAO_ORB_Core* orb_core = - this->transport_->orb_core (); - - TAO_Leader_Follower& leader_follower = - orb_core->leader_follower (); - - // Obtain the lock. - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, - leader_follower.lock (), -1); - - // Optmize the first iteration [no access to errno] - int result = 1; - - // - // Begin artificial scope for auto_ptr like helpers calling: - // leader_follower.set_client_thread () and (maybe later on) - // leader_follower.set_client_leader_thread (). - // - { - // Calls leader_follower.set_client_thread () on construction and - // leader_follower.reset_client_thread () on destruction. - TAO_LF_Client_Thread_Helper client_thread_helper (leader_follower); - ACE_UNUSED_ARG (client_thread_helper); - - ACE_Countdown_Time countdown (max_wait_time); - - // Check if there is a leader. Note that it cannot be us since we - // gave up our leadership when we became a client. - if (leader_follower.leader_available ()) - { - // = Wait as a follower. - - // Grab the condtion variable. - TAO_SYNCH_CONDITION* cond = - orb_core->leader_follower_condition_variable (); - - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - wait (follower) on Transport <%x>, cond <%x>\n"), - this->transport_, - cond)); - - // Keep the entry on the stack - TAO_Leader_Follower::TAO_Follower_Node node(cond); - - while (!reply_received && - leader_follower.leader_available ()) - { - // Add ourselves to the list, do it everytime we wake up - // from the CV loop. Because: - // - // - The leader thread could have elected us as the new - // leader. - // - Before we can assume the role another thread becomes - // the leader - // - But our condition variable could have been removed - // already, if we don't add it again we will never wake - // up. - // - // Notice that we can have spurious wake ups, in that case - // adding the leader results in an error, that must be - // ignored. - // You may be thinking of not removing the condition - // variable in the code that sends the signal, but - // removing it here, that does not work either, in that - // case the condition variable may be used to: - // - Wake up because its reply arrived - // - Wake up because it must become the leader - // but only the first one has any effect, so the leader is - // lost. - // - - (void) leader_follower.add_follower (&node); - - if (max_wait_time == 0) - { - if (cond == 0 || cond->wait () == -1) - { - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - wait (follower) on <%x> ") - ACE_TEXT ("cond == 0 || cond->wait () == -1 : cond = %d\n"), - this->transport_, (cond == 0) ? 0 : cond)); - - // @@ Michael: What is our error handling in this case? - // We could be elected as leader and no leader would come in? - return -1; - } - } - else - { - countdown.update (); - ACE_Time_Value tv = ACE_OS::gettimeofday (); - tv += *max_wait_time; - if (cond == 0 || cond->wait (&tv) == -1) - { - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - wait (follower) on <%x> ") - ACE_TEXT ("cond == 0 || cond->wait (tv) == -1\n"), - this->transport_)); - - if (leader_follower.remove_follower (&node) == -1 - && reply_received == 0) - { - // Remove follower can fail because either - // 1) the reply arrived, or - // 2) somebody elected us as leader, or - // 3) the connection got closed. - // - // reply_received is 1, if the reply arrived. - // reply_received is 0, if the reply did not arrive yet. - // reply_received is -1, if the connection got closed - // - // Therefore: - // If remove_follower fails and reply_received is 0, we know that - // we got elected as a leader. As we cannot be the leader (remember - // we got a timeout), we have to select a new leader. - // - // ACE_DEBUG ((LM_DEBUG, - // "TAO (%P|%t) TAO_Wait_On_Leader_Follower::wait - " - // "We got elected as leader, but have timeout\n")); - - if (leader_follower.elect_new_leader () == -1) - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) TAO_Wait_On_Leader_Follower::wait - " - "elect_new_leader failed\n")); - - } - return -1; - } - } - } - - countdown.update (); - - // @@ Michael: This is an old comment why we do not want to - // remove the follower here. - // We should not remove the follower here, we *must* remove it when - // we signal it so the same condition is not signalled for - // both wake up as a follower and as the next leader. - - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - done (follower) " - "on <%x>, reply_received %d\n"), - this->transport_, reply_received)); - - // Now somebody woke us up to become a leader or to handle our - // input. We are already removed from the follower queue. - - if (reply_received == 1) - return 0; - - // FALLTHROUGH - // We only get here if we woke up but the reply is not - // complete yet, time to assume the leader role.... - // i.e. ACE_ASSERT (reply_received == 0); - } - - // = Leader Code. - - // The only way to reach this point is if we must become the - // leader, because there is no leader or we have to update to a - // leader or we are doing nested upcalls in this case we do - // increase the refcount on the leader in TAO_ORB_Core. - - // Calls leader_follower.set_client_leader_thread () on - // construction and leader_follower.reset_client_leader_thread () - // on destruction. Note that this may increase the refcount of - // the leader. - TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (leader_follower); - ACE_UNUSED_ARG (client_leader_thread_helper); - - { - ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon, - leader_follower.reverse_lock (), -1); - - // Become owner of the reactor. - ACE_Reactor *reactor = orb_core->reactor (); - reactor->owner (ACE_Thread::self ()); - - // Run the reactor event loop. - - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - wait (leader):to enter reactor event loop on <%x>\n"), - this->transport_)); - - // If we got our reply, no need to run the event loop any - // further. - while (!reply_received) - { - // Run the event loop. - result = reactor->handle_events (max_wait_time); - - // Did we timeout? If so, stop running the loop. - if (result == 0 && - max_wait_time != 0 && - *max_wait_time == ACE_Time_Value::zero) - break; - - // Other errors? If so, stop running the loop. - if (result == -1) - break; - - // Otherwise, keep going... - } - - if (TAO_debug_level >= 5) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - wait : (leader) : done with reactor event loop on <%x>\n"), - this->transport_)); - } - } - // - // End artificial scope for auto_ptr like helpers calling: - // leader_follower.reset_client_thread () and (maybe) - // leader_follower.reset_client_leader_thread (). - // - - // Wake up the next leader, we cannot do that in handle_input, - // because the woken up thread would try to get into handle_events, - // which is at the time in handle_input still occupied. But do it - // before checking the error in <result>, even if there is an error - // in our input we should continue running the loop in another - // thread. - - if (leader_follower.elect_new_leader () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t):TAO_Wait_On_Leader_Follower::send_request: ") - ACE_TEXT ("Failed to unset the leader and wake up a new follower.\n")), - -1); - - if (result == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t):TAO_Wait_On_Leader_Follower::wait: ") - ACE_TEXT ("handle_events failed.\n")), - -1); - - // Return an error if there was a problem receiving the reply... - if (max_wait_time != 0) - { - if (reply_received != 1 - && *max_wait_time == ACE_Time_Value::zero) - { - result = -1; - errno = ETIME; - } - else if (reply_received == -1) - { - // If the time did not expire yet, but we get a failure, - // e.g. the connections closed, we should still return an error. - result = -1; - } - } - else - { - result = 0; - if (reply_received == -1) - { - result = -1; - } - } - - return result; -} - -TAO_SYNCH_CONDITION * -TAO_Wait_On_Leader_Follower::leader_follower_condition_variable (void) -{ - return this->transport_->orb_core ()->leader_follower_condition_variable (); -} - -int -TAO_Wait_On_Leader_Follower::reply_dispatched (int &reply_received_flag, - TAO_SYNCH_CONDITION *condition) -{ - if (condition == 0) - return 0; - - TAO_Leader_Follower& leader_follower = - this->transport_->orb_core ()->leader_follower (); - - // Obtain the lock. - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, - leader_follower.lock (), - -1); - - reply_received_flag = 1; - - // The following works as the node is assumed to be on the stack - // till the thread is alive. - TAO_Leader_Follower::TAO_Follower_Node node (condition); - - // We *must* remove it when we signal it so the same condition - // is not signalled for both wake up as a follower and as the - // next leader. - // The follower may not be there if the reply is received while - // the consumer is not yet waiting for it (i.e. it send the - // request but has not blocked to receive the reply yet). - // Ignore errors. - (void) leader_follower.remove_follower (&node); - - if (condition->signal () == -1) - return -1; - - return 0; -} - -void -TAO_Wait_On_Leader_Follower::connection_closed (int &reply_received_flag, - TAO_SYNCH_CONDITION *condition) -{ - if (condition == 0) - return; - TAO_Leader_Follower& leader_follower = this->transport_->orb_core ()->leader_follower (); - - // Obtain the lock. - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, leader_follower.lock ()); - - reply_received_flag = -1; - - // The following works as the node is assumed to be on the stack - // till the thread is alive. - TAO_Leader_Follower::TAO_Follower_Node node(condition); - - (void) leader_follower.remove_follower (&node); - - (void) condition->signal (); + return leader_follower.wait_for_event (&rd, + this->transport_, + max_wait_time); } diff --git a/TAO/tao/Wait_On_Leader_Follower.h b/TAO/tao/Wait_On_Leader_Follower.h index 179964fde68..a65d19cd854 100644 --- a/TAO/tao/Wait_On_Leader_Follower.h +++ b/TAO/tao/Wait_On_Leader_Follower.h @@ -49,12 +49,9 @@ public: virtual int sending_request (TAO_ORB_Core *orb_core, int two_way); virtual int wait (ACE_Time_Value *max_wait_time, - int &reply_received); + TAO_Synch_Reply_Dispatcher &rd); virtual int register_handler (void); virtual int non_blocking (void); - virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); - virtual int reply_dispatched (int &, TAO_SYNCH_CONDITION *); - virtual void connection_closed (int &, TAO_SYNCH_CONDITION *); }; #include "ace/post.h" diff --git a/TAO/tao/Wait_On_Reactor.cpp b/TAO/tao/Wait_On_Reactor.cpp index b57ba86b1a2..ed5094c47f7 100644 --- a/TAO/tao/Wait_On_Reactor.cpp +++ b/TAO/tao/Wait_On_Reactor.cpp @@ -2,7 +2,8 @@ #include "tao/Wait_On_Reactor.h" #include "tao/ORB_Core.h" -#include "Transport.h" +#include "tao/Transport.h" +#include "tao/Synch_Reply_Dispatcher.h" ACE_RCSID(tao, Wait_On_Reactor, "$Id$") @@ -17,7 +18,7 @@ TAO_Wait_On_Reactor::~TAO_Wait_On_Reactor (void) int TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time, - int &reply_received) + TAO_Synch_Reply_Dispatcher &rd) { // Reactor does not change inside the loop. ACE_Reactor* reactor = @@ -32,7 +33,7 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time, // If we got our reply, no need to run the event loop any // further. - if (reply_received) + if (rd.reply_received ()) break; // Did we timeout? If so, stop running the loop. @@ -48,13 +49,13 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time, // Otherwise, keep going... } - if (result == -1 || reply_received == -1) + if (result == -1 || rd.reply_received () == -1) return -1; // Return an error if there was a problem receiving the reply. if (max_wait_time != 0) { - if (reply_received != 1 && + if (rd.reply_received () != 1 && *max_wait_time == ACE_Time_Value::zero) { result = -1; @@ -64,7 +65,7 @@ TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time, else { result = 0; - if (reply_received == -1) + if (rd.reply_received () == -1) result = -1; } diff --git a/TAO/tao/Wait_On_Reactor.h b/TAO/tao/Wait_On_Reactor.h index 3e69a25a9ba..412c194d2af 100644 --- a/TAO/tao/Wait_On_Reactor.h +++ b/TAO/tao/Wait_On_Reactor.h @@ -41,7 +41,7 @@ public: // = Documented in TAO_Wait_Strategy. virtual int wait (ACE_Time_Value *max_wait_time, - int &reply_received); + TAO_Synch_Reply_Dispatcher &rd); virtual int register_handler (void); virtual int non_blocking (void); }; diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp index 29e983a90db..149a75d17e7 100644 --- a/TAO/tao/Wait_On_Read.cpp +++ b/TAO/tao/Wait_On_Read.cpp @@ -1,8 +1,9 @@ // $Id$ #include "tao/Wait_On_Read.h" -#include "Transport.h" -#include "Resume_Handle.h" +#include "tao/Transport.h" +#include "tao/Resume_Handle.h" +#include "tao/Synch_Reply_Dispatcher.h" ACE_RCSID(tao, Wait_On_Read, "$Id$") @@ -20,9 +21,9 @@ TAO_Wait_On_Read::~TAO_Wait_On_Read (void) // Wait on the read operation. int TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, - int &reply_received) + TAO_Synch_Reply_Dispatcher &rd) { - reply_received = 0; + rd.reply_received () = 0; // Do the same sort of looping that is done in other wait // strategies. @@ -37,7 +38,7 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, // If we got our reply, no need to run the loop any // further. - if (reply_received) + if (rd.reply_received ()) break; // @@ We are not checking for timeouts here... @@ -47,12 +48,12 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, break; } - if (reply_received == -1 || retval == -1) + if (rd.reply_received () == -1 || retval == -1) { this->transport_->close_connection (); } - return (reply_received == 1 ? 0 : reply_received); + return (rd.reply_received () == 1 ? 0 : rd.reply_received ()); } // No-op. diff --git a/TAO/tao/Wait_On_Read.h b/TAO/tao/Wait_On_Read.h index b6a3c65bc4e..142c7905a48 100644 --- a/TAO/tao/Wait_On_Read.h +++ b/TAO/tao/Wait_On_Read.h @@ -40,7 +40,7 @@ public: // = Documented in TAO_Wait_Strategy. virtual int wait (ACE_Time_Value *max_wait_time, - int &reply_received); + TAO_Synch_Reply_Dispatcher &rd); virtual int register_handler (void); virtual int non_blocking (void); }; diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index 336370dca13..3f79613f092 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -27,28 +27,3 @@ TAO_Wait_Strategy::sending_request (TAO_ORB_Core * /* orb_core */, { return 0; } - -TAO_SYNCH_CONDITION * -TAO_Wait_Strategy::leader_follower_condition_variable (void) -{ - return 0; -} - -int -TAO_Wait_Strategy::reply_dispatched (int &reply_received, - TAO_SYNCH_CONDITION *) -{ - // In most implementations of this strategy there is no need to - // acquire any mutex to set the reply_received flag. - reply_received = 1; - return 0; -} - -void -TAO_Wait_Strategy::connection_closed (int &reply_received, - TAO_SYNCH_CONDITION*) -{ - // In most implementations of this strategy there is no need to - // acquire any mutex to set the reply_received flag. - reply_received = -1; -} diff --git a/TAO/tao/Wait_Strategy.h b/TAO/tao/Wait_Strategy.h index c00e64c11a8..a9bf3d29618 100644 --- a/TAO/tao/Wait_Strategy.h +++ b/TAO/tao/Wait_Strategy.h @@ -23,6 +23,7 @@ class TAO_ORB_Core; class TAO_Transport; +class TAO_Synch_Reply_Dispatcher; /** * @class TAO_Wait_Strategy @@ -51,7 +52,7 @@ public: /// Base class virtual method. Wait till the <reply_received> flag is /// true or the time expires. virtual int wait (ACE_Time_Value *max_wait_time, - int &reply_received) = 0; + TAO_Synch_Reply_Dispatcher &rd) = 0; /// Register the handler needs with the reactor provided that it makes /// sense for the strategy. @@ -61,37 +62,6 @@ public: /// the socket on which it is waiting to non-blocking mode or not. virtual int non_blocking (void) = 0; - /** - * Return the TSS leader follower condition variable used in the - * Wait Strategy. Muxed Leader Follower implementation returns a - * valid condition variable, others return 0. - * The condition variable is acquired by the Reply_Dispatcher (when - * needed) in the thread that binds it to the right Transport. - * Later (when the reply is finally received) the Reply_Dispatcher - * passes this condition variable back to Waiting_Strategy, that can - * then signal the waiting thread if needed. - */ - virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); - - /** - * This is the callback used by the Reply_Dispatcher to inform the - * Waiting_Strategy that a reply has been completely received, that - * it was already stored in the right place, and that the condition - * variable should be signalled if needed. - * The Waiting_Strategy must set the reply received flag, using - * whatever locks it needs. - */ - virtual int reply_dispatched (int &reply_received_flag, - TAO_SYNCH_CONDITION *); - - /** - * The connection has been closed by the lower level components in - * the ORB. - * The wait has finished and must result in an error. - */ - virtual void connection_closed (int &reply_received_flag, - TAO_SYNCH_CONDITION*); - /// Get/Set method for the flag int is_registered (void); void is_registered (int flag); diff --git a/TAO/tao/default_client.cpp b/TAO/tao/default_client.cpp index 59549a457b9..804dc0764a2 100644 --- a/TAO/tao/default_client.cpp +++ b/TAO/tao/default_client.cpp @@ -24,7 +24,7 @@ TAO_Default_Client_Strategy_Factory::TAO_Default_Client_Strategy_Factory (void) this->wait_strategy_ = TAO_WAIT_ON_LEADER_FOLLOWER; #endif /* TAO_USE_ST_CLIENT_CONNECTION_HANDLER */ -#if defined (TAO_USE_MUXED_TRANSPORT_MUX_STRATEGY) +#if TAO_USE_MUXED_TRANSPORT_MUX_STRATEGY == 1 this->transport_mux_strategy_ = TAO_MUXED_TMS; #else this->transport_mux_strategy_ = TAO_EXCLUSIVE_TMS; diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp index e796014d92e..580bfeb3db5 100644 --- a/TAO/tao/default_resource.cpp +++ b/TAO/tao/default_resource.cpp @@ -12,9 +12,13 @@ #include "tao/Reactive_Flushing_Strategy.h" #include "tao/Block_Flushing_Strategy.h" +#include "tao/Leader_Follower_Flushing_Strategy.h" + #include "tao/Leader_Follower.h" #include "tao/LRU_Connection_Purging_Strategy.h" +#include "tao/LF_Strategy_Complete.h" + #include "ace/TP_Reactor.h" #include "ace/Dynamic_Service.h" #include "ace/Arg_Shifter.h" @@ -26,8 +30,6 @@ ACE_RCSID(tao, default_resource, "$Id$") - - TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void) : use_tss_resources_ (0), use_locked_data_blocks_ (1), @@ -39,10 +41,10 @@ TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void) purge_percentage_ (TAO_PURGE_PERCENT), reactor_mask_signals_ (1), dynamically_allocated_reactor_ (0), + cached_connection_lock_type_ (TAO_THREAD_LOCK), options_processed_ (0), factory_disabled_ (0), - cached_connection_lock_type_ (TAO_THREAD_LOCK), - flushing_strategy_type_ (TAO_REACTIVE_FLUSHING) + flushing_strategy_type_ (TAO_LEADER_FOLLOWER_FLUSHING) { } @@ -81,7 +83,7 @@ TAO_Default_Resource_Factory::init (int argc, char **argv) return 0; } this->options_processed_ = 1; - + this->parser_names_count_ = 0; int curarg = 0; @@ -335,6 +337,9 @@ TAO_Default_Resource_Factory::init (int argc, char **argv) char *name = argv[curarg]; if (ACE_OS::strcasecmp (name, + "leader_follower") == 0) + this->flushing_strategy_type_ = TAO_LEADER_FOLLOWER_FLUSHING; + else if (ACE_OS::strcasecmp (name, "reactive") == 0) this->flushing_strategy_type_ = TAO_REACTIVE_FLUSHING; else if (ACE_OS::strcasecmp (name, @@ -844,7 +849,11 @@ TAO_Flushing_Strategy * TAO_Default_Resource_Factory::create_flushing_strategy (void) { TAO_Flushing_Strategy *strategy = 0; - if (this->flushing_strategy_type_ == TAO_REACTIVE_FLUSHING) + if (this->flushing_strategy_type_ == TAO_LEADER_FOLLOWER_FLUSHING) + ACE_NEW_RETURN (strategy, + TAO_Leader_Follower_Flushing_Strategy, + 0); + else if (this->flushing_strategy_type_ == TAO_REACTIVE_FLUSHING) ACE_NEW_RETURN (strategy, TAO_Reactive_Flushing_Strategy, 0); @@ -884,7 +893,7 @@ TAO_Default_Resource_Factory::create_lf_strategy (void) TAO_LF_Strategy *strategy = 0; ACE_NEW_RETURN (strategy, - TAO_Complete_LF_Strategy, + TAO_LF_Strategy_Complete, 0); return strategy; diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h index 3f189678498..917a5f7eb14 100644 --- a/TAO/tao/default_resource.h +++ b/TAO/tao/default_resource.h @@ -171,7 +171,7 @@ protected: /// If it has been disabled we should print warnings if options /// were processed before (or later). int factory_disabled_; - + private: enum Lock_Type { @@ -184,6 +184,7 @@ private: enum Flushing_Strategy_Type { + TAO_LEADER_FOLLOWER_FLUSHING, TAO_REACTIVE_FLUSHING, TAO_BLOCKING_FLUSHING }; diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h index 52d26fc85f9..99e73675bbb 100644 --- a/TAO/tao/orbconf.h +++ b/TAO/tao/orbconf.h @@ -982,5 +982,10 @@ enum TAO_Policy_Scope # define TAO_DOESNT_RESUME_CONNECTION_HANDLER 0 #endif /*TAO_DOESNT_RESUMES_CONNECTION_HANDLER*/ +/// By default we use Muxed Transports +#if !defined (TAO_USE_MUXED_TRANSPORT_MUX_STRAGEGY) +# define TAO_USE_MUXED_TRANSPORT_MUX_STRAGEGY 1 +#endif /* TAO_USE_MUXED_TRANSPORT_MUX_STRAGEGY */ + #include "ace/post.h" #endif /* TAO_ORB_CONFIG_H */ |