diff options
-rw-r--r-- | ChangeLog-99b | 100 | ||||
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | ace/Future.cpp | 141 | ||||
-rw-r--r-- | ace/Future.h | 110 | ||||
-rw-r--r-- | ace/Future_Node.cpp | 43 | ||||
-rw-r--r-- | ace/Future_Node.h | 72 | ||||
-rw-r--r-- | ace/Future_Set.cpp | 129 | ||||
-rw-r--r-- | ace/Future_Set.h | 109 | ||||
-rw-r--r-- | ace/Makefile | 2 | ||||
-rw-r--r-- | ace/Synch.h | 6 |
10 files changed, 664 insertions, 50 deletions
diff --git a/ChangeLog-99b b/ChangeLog-99b index fe640961a7c..54137774d17 100644 --- a/ChangeLog-99b +++ b/ChangeLog-99b @@ -1,3 +1,93 @@ +Fri Jun 25 16:03:59 1999 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + + * ace/Future.cpp: Fixed a couple of places where the set() method + didn't have the new *this parameter added. + + * ace: Added Future_Set.{h,cpp} and Future_Node.{h,cpp}, as well + as the new Future.{h,cpp} files to the ACE release. Thanks to + John Tucker <jtucker@infoglide.com> for contributing these. + + * ace/Synch.h: Added a comment pointing out that Solaris + threads do not support timed acquire(). Thanks to Darren + DeRidder <darren.deridder@bridgewatersys.com> for reporting + this. + +Fri Jun 25 15:07:00 1999 John Tucker <jtucker@infoglide.com> + + The following describes the changes to ACE_Future_Set: + + * I followed the Observer Pattern where the ACE_Future_Set is the + "Observer" and the ACE_Future_Rep is the "Subject". + + * I created an abstract base class called ACE_Future_Observer<T> + which contains a single pure virtual member function "update()". + + * The ACE_Future_Rep is modified so that it contains a list of + ACE_Future_Observer<T> pointers and an interface for attaching + and detaching ACE_Future_Observer<T> Observer objects. The + "attach(...)" member function allows objects which implement the + ACE_Future_Observer<T> interface to attach themselves to + ACE_Future_Rep<T> objects so that they will be notified of + changes to the ACE_Future_Rep<T> Subject. The "detach(...)" + member function allows objects which implement the + ACE_Future_Observer<T> interface to detach themselves from + ACE_Future_Rep<T> objects so that they will no longer be + notified of changes to the ACE_Future_Rep<T> Subject. + + * The ACE_Future_Rep::set() method is modified so that it invokes the + "update()" method of each ACE_Future_Observer<T> object stored + in its list of attached ACE_Future_Observer<T> Observer + objects. It also removes each ACE_Future_Observer<T> Observer + object from its list. + + * The ACE_Future_Set<T> class derives from the ACE_Future_Observer<T> + class. + + * The ACE_Future_Set<T> class contains its own ACE_Message_Queue + attribute which will be used by writer threads to notify reader + threads that an ACE_Future is readable. + + * The ACE_Future_Set<T> class contains a list of ACE_Future<T> + Subject objects which clients in the reader threads insert into + it. When an ACE_Future<T> is inserted into an + ACE_Future_Set<T>, the ACE_Future_Set<T> attaches itself to the + specified ACE_Future<T>, keeping in mind the ACE_Future_Set<T> + implements the ACE_Future_Observer<T> interface, and also + inserts the specified ACE_Future<T> object into its list of + ACE_Future<T> objects. + + * The ACE_Future_Set<T> class implements its "update()" method to + just enqueue an ACE_Message_Block with Null data onto its + ACE_Message_Queue. Since this "ACE_Future_Set<T>::update()" + method will be invoked by the ACE_Future_Rep::set() method in + the writer thread, it will allow the ACE_Future_Set<T> in the + writer thread to signal itself that an ACE_Future has become + readable. + + * The "int ACE_Future_Set<T>::next_readable(ACE_Future<T> &future, + ACE_Time_Value *)" method in the reader thread will block via a + call to "dequeue()" on its ACE_Message_Queue until is awakened + by a call to "ACE_Future_Set<T>::update()" in the writer thread. + Once awakened, the ACE_Future_Set<T> object iterates through its + list of ACE_Future<T> objects until it encounters a readable + one. This readable ACE_Future<T> will be removed from its list + and assigned to the specified "future" parameter". + + * The ACE_Future_Set<T> destructor detaches itself from all + ACE_Future<T> objects remaining in its list. + + * I did not use the ACE_Reactor since I could foresee the + ACE_Future_Set being used by applications which did not run,or + want to run, the event loop. + +Fri Jun 25 14:28:05 1999 Ossama Othman <othman@cs.wustl.edu> + + * ace/UNIX_Addr.cpp (ACE_UNIX_Addr): Moved base_set() call in + constructor to member initializer list. This is cleaner coding + style. The original code, before a bug fix was made, was like + that. I just forgot to put it back in the member initializer + list after the fix. + Fri Jun 25 15:15:32 1999 Steve Huston <shuston@riverace.com> * performance-tests/Misc/Makefile: set static_libs_only so the library @@ -11,16 +101,6 @@ Fri Jun 25 15:15:32 1999 Steve Huston <shuston@riverace.com> #pragma implementation line. The compiler remembers the directory it came from. -Fri Jun 25 14:28:05 1999 Ossama Othman <othman@cs.wustl.edu> - - * ace/UNIX_Addr.cpp (ACE_UNIX_Addr): Moved base_set() call in - constructor to member initializer list. This is cleaner coding - style. The original code, before a bug fix was made, was like - that. I just forgot to put it back in the member initializer - list after the fix. - -Fri Jun 25 13:02:33 1999 Steve Huston <shuston@riverace.com> - * examples/ASX/UPIPE_Event_Server/Peer_Router.h: * examples/Connection/non_blocking/CPP-connector.h: * examples/Connection/non_blocking/CPP-acceptor.h: @@ -875,6 +875,8 @@ Gilbert Roulot <gilbert.roulot@tts.thomson-csf.com> Gildo Medeiros Junior <gildo@siemens.com.br> Brian Peterson <bpeterson@globalmt.com> Fabrice Podlyski <podlyski@clrhp04.in2p3.fr> +Darren DeRidder <darren.deridder@bridgewatersys.com> +John Tucker <jtucker@infoglide.com> I would particularly like to thank Paul Stephenson, who worked with me at Ericsson and is now at ObjectSpace. Paul devised the recursive diff --git a/ace/Future.cpp b/ace/Future.cpp index 236b2d681bf..dc45f309b7d 100644 --- a/ace/Future.cpp +++ b/ace/Future.cpp @@ -16,6 +16,16 @@ ACE_RCSID(ace, Future, "$Id$") #if defined (ACE_HAS_THREADS) +template <class T> +ACE_Future_Observer<T>::ACE_Future_Observer (void) +{ +} + +template <class T> +ACE_Future_Observer<T>::~ACE_Future_Observer (void) +{ +} + // Dump the state of an object. template <class T> void @@ -26,10 +36,10 @@ ACE_Future_Rep<T>::dump (void) const "ref_count_ = %d\n", (int) this->ref_count_)); ACE_DEBUG ((LM_INFO,"value_: \n")); - if (this->value_) + if(this->value_) ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" (NON-NULL)\n"))); else - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" (NULL)\n"))); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" (NULL)\n"))); ACE_DEBUG ((LM_INFO,"value_ready_: \n")); this->value_ready_.dump (); @@ -62,7 +72,7 @@ ACE_Future_Rep<T>::detach (ACE_Future_Rep<T>*& rep) // Use value_ready_mutex_ for both condition and ref count management ACE_MT (ACE_GUARD (ACE_Thread_Mutex, r_mon, rep->value_ready_mutex_)); - if (rep->ref_count_-- == 0) + if(rep->ref_count_-- == 0) { ACE_MT (r_mon.release ()); // We do not need the lock when deleting the representation. @@ -84,7 +94,7 @@ ACE_Future_Rep<T>::assign (ACE_Future_Rep<T>*& rep, ACE_Future_Rep<T>* new_rep) rep = new_rep; // detached old last for exception safety - if (old->ref_count_-- == 0) + if(old->ref_count_-- == 0) { ACE_MT (r_mon.release ()); // We do not need the lock when deleting the representation. @@ -115,17 +125,31 @@ ACE_Future_Rep<T>::ready (void) } template <class T> int -ACE_Future_Rep<T>::set (const T &r) +ACE_Future_Rep<T>::set (const T &r, + ACE_Future<T> &caller) { // If the value is already produced, ignore it... - if (this->value_ == 0) + if(this->value_ == 0) { ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->value_ready_mutex_, -1)); // Otherwise, create a new result value. Note the use of the // Double-checked locking pattern to avoid multiple allocations. - if (this->value_ == 0) - ACE_NEW_RETURN (this->value_, T (r), -1); + if(this->value_ == 0) + ACE_NEW_RETURN (this->value_, + T (r), + -1); + + // Remove and notify all subscribed observers. + for (OBSERVER_NODE *node = this->observer_list_.delete_head(); + node; + node = this->observer_list_.delete_head()) + { + OBSERVER *observer = ACE_reinterpret_cast (OBSERVER *, + node->item_); + delete node; + observer->update (caller); + } // Signal all the waiting threads. return this->value_ready_.broadcast (); @@ -149,7 +173,7 @@ ACE_Future_Rep<T>::get (T &value, while (this->value_ == 0) // Perform a timed wait. - if (this->value_ready_.wait (tv) == -1) + if(this->value_ready_.wait (tv) == -1) return -1; // Destructor releases the lock. @@ -159,11 +183,64 @@ ACE_Future_Rep<T>::get (T &value, return 0; } +template <class T> void +ACE_Future_Rep<T>::attach(ACE_Future_Observer<T> *observer, + ACE_Future<T> &caller) +{ + ACE_MT (ACE_GUARD (ACE_Thread_Mutex, r_mon, this->value_ready_mutex_));; + + // Otherwise, create a new result value. Note the use of the + // Double-checked locking pattern to avoid corrupting the list. + + // If the value is already produced, then notify observer + if (this->value_ == 0) + { + OBSERVER_NODE *node; + ACE_NEW (node, + OBSERVER_NODE ((void *&) observer)); + + this->observer_list_.insert_tail (node); + } + else + observer->update(caller); +} + +template <class T> int +ACE_Future_Rep<T>::detach(ACE_Future_Observer<T> *observer) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->value_ready_mutex_, 0)); + + int result = 0; + + // Remove all occurrences of the specified observer from this + // objects list. Note the use of the Double-checked locking pattern + // to avoid corrupting the list. + + OBSERVER_NODE *node = 0; + for (OBSERVER_LIST::ITERATOR iter (this->observer_list_); + (node = iter.next ()) != 0; + iter.advance ()) + { + OBSERVER *curr_observer = + ACE_reinterpret_cast (OBSERVER *, + node->item_); + + if (curr_observer == curr_observer) + { + this->observer_list_.remove (node); + delete node; + result = 1; + } + } + + return result; +} + template <class T> ACE_Future_Rep<T>::operator T () { // If the value is already produced, return it. - if (this->value_ == 0) + if(this->value_ == 0) { // Constructor of ace_mon acquires the mutex. ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->value_ready_mutex_, 0)); @@ -188,13 +265,13 @@ ACE_Future_Rep<T>::operator T () template <class T> ACE_Future<T>::ACE_Future (void) - : future_rep_ (FUTURE_REP::create ()) +: future_rep_ (FUTURE_REP::create ()) { } template <class T> ACE_Future<T>::ACE_Future (const ACE_Future<T> &r) - : future_rep_ (FUTURE_REP::attach (((ACE_Future<T> &) r).future_rep_)) +: future_rep_ (FUTURE_REP::attach (((ACE_Future<T> &) r).future_rep_)) { } @@ -202,8 +279,10 @@ template <class T> ACE_Future<T>::ACE_Future (const T &r) : future_rep_ (FUTURE_REP::create ()) { - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT (" (%t) funny constructor\n"))); - this->future_rep_->set (r); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT (" (%t) funny constructor\n"))); + this->future_rep_->set (r, + *this); } template <class T> @@ -228,7 +307,8 @@ template <class T> int ACE_Future<T>::cancel (const T &r) { this->cancel (); - return this->future_rep_->set (r); + return this->future_rep_->set (r, + *this); } template <class T> int @@ -236,7 +316,8 @@ ACE_Future<T>::cancel (void) { // If this ACE_Future is already attached to a ACE_Future_Rep, // detach it (maybe delete the ACE_Future_Rep). - FUTURE_REP::assign (this->future_rep_, FUTURE_REP::create ()); + FUTURE_REP::assign (this->future_rep_, + FUTURE_REP::create ()); return 0; } @@ -244,8 +325,10 @@ template <class T> int ACE_Future<T>::set (const T &r) { // Give the pointer to the result to the ACE_Future_Rep. - return this->future_rep_->set (r); + return this->future_rep_->set (r, + *this); } + template <class T> int ACE_Future<T>::ready (void) { @@ -254,12 +337,25 @@ ACE_Future<T>::ready (void) } template <class T> int -ACE_Future<T>::get (T &value, ACE_Time_Value *tv) +ACE_Future<T>::get (T &value, + ACE_Time_Value *tv) { // We return the ACE_Future_rep. return this->future_rep_->get (value, tv); } +template <class T> void +ACE_Future<T>::attach(ACE_Future_Observer<T> *observer) +{ + this->future_rep_->attach (observer, *this); +} + +template <class T> int +ACE_Future<T>::detach(ACE_Future_Observer<T> *observer) +{ + return this->future_rep_->detach (observer); +} + template <class T> ACE_Future<T>::operator T () { @@ -286,7 +382,7 @@ ACE_Future<T>::operator = (const ACE_Future<T> &rhs) // bind <this> to the same <ACE_Future_Rep> as <r>. // This will work if &r == this, by first increasing the ref count - ACE_Future<T> &r = ( ACE_Future<T> &) rhs; + ACE_Future<T> &r = (ACE_Future<T> &) rhs; FUTURE_REP::assign (this->future_rep_, FUTURE_REP::attach (r.future_rep_)); } @@ -294,11 +390,14 @@ ACE_Future<T>::operator = (const ACE_Future<T> &rhs) template <class T> void ACE_Future<T>::dump (void) const { - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, + ACE_BEGIN_DUMP, this)); if (this->future_rep_) this->future_rep_->dump (); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); + + ACE_DEBUG ((LM_DEBUG, + ACE_END_DUMP)); } template <class T> void * diff --git a/ace/Future.h b/ace/Future.h index e512f627f26..f33a4001d3c 100644 --- a/ace/Future.h +++ b/ace/Future.h @@ -10,9 +10,10 @@ // Future.h // // = AUTHOR -// Andres Kruse <Andres.Kruse@cern.ch>, Douglas C. Schmidt -// <schmidt@cs.wustl.edu>, and Per Andersson -// <Per.Andersson@hfera.ericsson.se>. +// Andres Kruse <Andres.Kruse@cern.ch>, +// Douglas C. Schmidt <schmidt@cs.wustl.edu>, +// Per Andersson <Per.Andersson@hfera.ericsson.se>, and +// John Tucker <jtucker@infoglide.com> // // ============================================================================ @@ -20,6 +21,7 @@ #define ACE_FUTURE_H #include "ace/Synch.h" +#include "ace/Containers_T.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -29,6 +31,36 @@ // Forward decl. template <class T> class ACE_Future; +template <class T> class ACE_Future_Observer; + +template <class T> +class ACE_Future_Observer +{ + // = TITLE + // ACE_Future_Observer<T> + // + // = DESCRIPTION + // An ACE_Future_Observer<T> object implements an object that is + // subscribed with an ACE_Future<T> object so that it may be + // notified when the value of the ACE_Future<T> object is + // written to by a writer thread. + // + // It uses the Observer pattern +public: + // = Destructor + virtual ~ACE_Future_Observer (void); + + virtual void update(const ACE_Future<T> &future) = 0; + // Called by the ACE_Future<T> in which we are subscribed to when + // its value is written to. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. +protected: + + // = Constructor + ACE_Future_Observer (void); +}; template <class T> class ACE_Future_Rep @@ -37,10 +69,10 @@ class ACE_Future_Rep // ACE_Future_Rep<T> // // = DESCRIPTION - // An ACE_Future_Rep<T> object encapsules a pointer to an - // object of class T which is the result of an asynchronous - // method invocation. It is pointed to by ACE_Future<T> object[s] - // and only accessible through them. + // An ACE_Future_Rep<T> object encapsules a pointer to an object + // of class T which is the result of an asynchronous method + // invocation. It is pointed to by ACE_Future<T> object[s] and + // only accessible through them. private: friend class ACE_Future<T>; @@ -49,33 +81,53 @@ private: // instances. static ACE_Future_Rep<T> *create (void); - // Create a ACE_Future_Rep<T> and initialize the reference count + // Create a ACE_Future_Rep<T> and initialize the reference count. static ACE_Future_Rep<T> *attach (ACE_Future_Rep<T> *&rep); - // Precondition(rep != 0) - // Increase the reference count and return argument. Uses - // the attribute "value_ready_mutex_" to synchronize reference - // count updating + // Increase the reference count and return argument. Uses the + // attribute "value_ready_mutex_" to synchronize reference count + // updating. + // + // Precondition(rep != 0). static void detach (ACE_Future_Rep<T> *&rep); + // Decreases the reference count and and deletes rep if there are no + // more references to rep. + // // Precondition(rep != 0) - // Decreases the reference count and and deletes rep if - // there are no more references to rep. static void assign (ACE_Future_Rep<T> *&rep, ACE_Future_Rep<T> *new_rep); - // Precondition(rep != 0 && new_rep != 0) // Decreases the rep's reference count and and deletes rep if there - // are no more references to rep. Then assigns new_rep to rep + // are no more references to rep. Then assigns new_rep to rep. + // + // Precondition(rep != 0 && new_rep != 0) - int set (const T &r); - // Set the result value. + int set (const T &r, + ACE_Future<T> &caller); + // Set the result value. The specified <caller> represents the + // future that invoked this <set> method, which is used to notify + // the list of future observers. int get (T &value, ACE_Time_Value *tv); // Wait up to <tv> time to get the <value>. Note that <tv> must be // specified in absolute time rather than relative time. + void attach (ACE_Future_Observer<T> *observer, + ACE_Future<T> &caller); + // Attaches the specified observer to a subject (i.e. the + // ACE_Future_Rep). The update method of the specified subject will + // be invoked with a copy of the written-to ACE_Future as input when + // the result gets set. + + int detach (ACE_Future_Observer<T> *observer); + // Detaches the specified observer from a subject (i.e. the + // ACE_Future_Rep). The update method of the specified subject will + // not be invoked when the ACE_Future_Reps result gets set. Returns + // 1 if the specified observer was actually attached to the subject + // prior to this call and 0 if was not. + operator T (); // Type conversion. will block forever until the result is // available. Note that this method is going away in a subsequent @@ -104,6 +156,13 @@ private: int ref_count_; // Reference count. + typedef ACE_Future_Observer<T> OBSERVER; + typedef ACE_DLList_Node OBSERVER_NODE; + typedef ACE_Double_Linked_List<OBSERVER_NODE> OBSERVER_LIST; + + OBSERVER_LIST observer_list_; + // Keep a list of ACE_Future_Observers unread by client's reader thread. + // = Condition variable and mutex that protect the <value_>. ACE_Condition_Thread_Mutex value_ready_; ACE_Thread_Mutex value_ready_mutex_; @@ -176,6 +235,21 @@ public: int ready (void); // Check if the result is available. + void attach (ACE_Future_Observer<T> *observer); + // Attaches the specified observer to a subject (i.e. the + // ACE_Future). The update method of the specified subject will be + // invoked with a copy of the associated ACE_Future as input when + // the result gets set. If the result is already set when this + // method gets invoked, then the update method of the specified + // subject will be invoked immediately. + + int detach (ACE_Future_Observer<T> *observer); + // Detaches the specified observer from a subject (i.e. the + // ACE_Future_Rep). The update method of the specified subject will + // not be invoked when the ACE_Future_Reps result gets set. Returns + // 1 if the specified observer was actually attached to the subject + // prior to this call and 0 if was not. + void dump (void) const; // Dump the state of an object. diff --git a/ace/Future_Node.cpp b/ace/Future_Node.cpp new file mode 100644 index 00000000000..79c8706dc7f --- /dev/null +++ b/ace/Future_Node.cpp @@ -0,0 +1,43 @@ +// Future.cpp +// $Id$ + +#define ACE_BUILD_DLL + +#ifndef ACE_FUTURE_NODE_CPP +#define ACE_FUTURE_NODE_CPP + +#include "ace/OS.h" +#include "ace/Future_Node.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_RCSID(ace, Future_Node, "$Id$") + +#if defined (ACE_HAS_THREADS) + +template <class T> +ACE_DLList_Future_Node<T>::ACE_DLList_Future_Node (void) + : next_ (0), + prev_ (0) +{ +} + +template <class T> +ACE_DLList_Future_Node<T>::ACE_DLList_Future_Node (const ACE_Future<T> &item, + ACE_DLList_Future_Node<T> *n, + ACE_DLList_Future_Node<T> *p) +: item_ (item), + next_ (n), + prev_ (p) +{ +} + +template <class T> +ACE_DLList_Future_Node<T>::~ACE_DLList_Future_Node (void) +{ +} + +#endif /* ACE_HAS_THREADS */ +#endif /* ACE_FUTURE_NODE_CPP */ diff --git a/ace/Future_Node.h b/ace/Future_Node.h new file mode 100644 index 00000000000..8590cbaccca --- /dev/null +++ b/ace/Future_Node.h @@ -0,0 +1,72 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// Future_Node.h +// +// = AUTHOR +// John Tucker <jtucker@infoglide.com> +// +// ============================================================================ + +#ifndef ACE_FUTURE_NODE_H +#define ACE_FUTURE_NODE_H + +#include "ace/Future.h" +#include "ace/Thread.h" +#include "ace/Containers_T.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if defined (ACE_HAS_THREADS) + +// Forward decl. +template <class T> class ACE_Future_Node; +template <class T> class ACE_DLList_Future_Node; + +template <class T> +class ACE_DLList_Future_Node +{ + // = TITLE + // Implementation of element in a ACE_Future list. + // Needed for ACE_Double_Linked_List. + + friend class ACE_Double_Linked_List<ACE_DLList_Future_Node>; + friend class ACE_Double_Linked_List_Iterator<ACE_DLList_Future_Node>; + +public: + // = Initialization + ACE_DLList_Future_Node (const ACE_Future<T> &future, + ACE_DLList_Future_Node *n = 0, + ACE_DLList_Future_Node *p = 0); + ~ACE_DLList_Future_Node (void); + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + + ACE_Future<T> item_; + ACE_DLList_Future_Node *next_; + ACE_DLList_Future_Node *prev_; + +protected: + ACE_DLList_Future_Node (void); +}; + + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Future_Node.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Future_Node.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_HAS_THREADS */ +#endif /* ACE_FUTURE_NODE_H */ diff --git a/ace/Future_Set.cpp b/ace/Future_Set.cpp new file mode 100644 index 00000000000..58a7ec6fba6 --- /dev/null +++ b/ace/Future_Set.cpp @@ -0,0 +1,129 @@ +// Future.cpp +// $Id$ + +#define ACE_BUILD_DLL + +#ifndef ACE_FUTURE_SET_CPP +#define ACE_FUTURE_SET_CPP + +#include "ace/Future_Set.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_RCSID(ace, Future_Set, "$Id$") + +#if defined (ACE_HAS_THREADS) + +template <class T> +ACE_Future_Set<T>::ACE_Future_Set(ACE_Message_Queue<ACE_SYNCH> *new_queue) + : delete_queue_ (0) +{ + if (new_queue) + this->future_notification_queue_ = new_queue; + else + { + ACE_NEW (this->future_notification_queue_, + ACE_Message_Queue<ACE_SYNCH>); + this->delete_queue_ = 1; + } +} + +template <class T> +ACE_Future_Set<T>::ACE_Future_Set(const ACE_Future_Set<T> &r) +{ +} + +template <class T> +ACE_Future_Set<T>::~ACE_Future_Set(void) +{ + // Detach ourselves from all remaining futures, if any, + // in our list. + for (FUTURE_NODE *node = this->future_list_.delete_head (); + node != 0; + node = this->future_list_.delete_head ()) + { + node->item_.detach (this); + delete node; + } + + if (this->delete_queue_ != 0) + delete this->future_notification_queue_; +} + +template <class T> int +ACE_Future_Set<T>::is_empty() const +{ + return this->future_list_.is_empty (); +} + +template <class T> void +ACE_Future_Set<T>::insert (ACE_Future<T> &future) +{ + FUTURE_NODE *node; + ACE_NEW (node, + FUTURE_NODE (future)); + this->future_list_.insert_tail (node); + + // Attach ourself to the ACE_Futures list of observer + future.attach (this); +} + +template <class T> void +ACE_Future_Set<T>::update (const ACE_Future<T> &future) +{ + ACE_Message_Block *mb; + ACE_NEW (mb, + ACE_Message_Block ((char *) 0, 0)); + + // Enqueue in priority order. + this->future_notification_queue_->enqueue (mb, 0); +} + +template <class T> int +ACE_Future_Set<T>::next_readable (ACE_Future<T> &future, + ACE_Time_Value *tv) +{ + if (this->is_empty ()) + return 0; + + ACE_Message_Block *mb; + + // Wait for a "readable future" signal from the message queue. + if (this->future_notification_queue_->dequeue_head (mb, + tv) != -1) + // Delete the message block. + mb->release (); + else + return 0; + + // Remove all nodes containing the specified future from our list. + int count = 0; + FUTURE_NODE *node = 0; + + for (FUTURE_LIST::ITERATOR iter (this->future_list_); + (node = iter.next ()) != 0; + iter.advance ()) + { + ++count; + if (node->item_.ready ()) + { + future = node->item_; + this->future_list_.remove (node); + delete node; + + // NOTE: if the user inserted the same future into the list + // more than once, then maybe I should loop through the + // remaining futures in the list and remove all of those + // futures which are equal to the one we are returning. + return 1; + } + } + + return 0; +} + +#endif /* ACE_HAS_THREADS */ +#endif /* ACE_FUTURE_SET_CPP */ + diff --git a/ace/Future_Set.h b/ace/Future_Set.h new file mode 100644 index 00000000000..5318781995e --- /dev/null +++ b/ace/Future_Set.h @@ -0,0 +1,109 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// Future_Set.h +// +// = AUTHOR +// John Tucker <jtucker@infoglide.com> +// +// ============================================================================ + +#ifndef ACE_FUTURE_SET_H +#define ACE_FUTURE_SET_H + +#include "ace/Thread.h" +#include "ace/Containers_T.h" +#include "ace/Message_Queue.h" + +#include "ace/Future.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if defined (ACE_HAS_THREADS) + +// Forward decl. +template <class T> class ACE_Future_Set; + +template <class T> +class ACE_Future_Set : public ACE_Future_Observer<T> +{ + // = TITLE + // This class implements a mechanism which allows the values of + // a collections of ACE_Future<T> objects to be accessed by + // reader threads as they become available. +public: + // = Initialization and termination methods. + + ACE_Future_Set (ACE_Message_Queue<ACE_SYNCH> *future_notification_queue_ = 0); + // Constructor. + + ~ACE_Future_Set (void); + // Destructor. + + int is_empty (void) const; + // Return 1 if their are no ACE_Future objects left on its queue and + // 0 otherwise + + void insert (ACE_Future<T> &future); + // Enqueus the given ACE_Future into this objects queue when it is + // readable. + + int next_readable (ACE_Future<T> &result, + ACE_Time_Value *tv = 0); + // Wait up to <tv> time to get the <value>. Note that <tv> must be + // specified in absolute time rather than relative time.); get the + // next ACE_Future<T> that is readable. If <tv> = 0, the will block + // forever. + // + // If a readable future becomes available, then the input result + // will be assigned with it and 1 will will be returned. If the set + // is empty, then 0 is returned. + + virtual void update (const ACE_Future<T> &future); + // Called by the ACE_Future<T> subject in which we are subscribed to + // when its value is written to. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Future_Set (const ACE_Future_Set &r); + // Copy constructor binds <this> and <r> to the same + // <ACE_Future_Set>. An <ACE_Future_Set> is created if necessary. + + typedef ACE_DLList_Future_Node<T> + FUTURE_NODE; + typedef ACE_Double_Linked_List<FUTURE_NODE> + FUTURE_LIST; + + FUTURE_LIST future_list_; + // List of ACE_Futures, subjects, which have not been written to by + // client's writer thread. + + ACE_Message_Queue<ACE_SYNCH> *future_notification_queue_; + // Message queue for notifying reader thread of ACE_Futures written + // to by client's writer thread. + + int delete_queue_; + // Keeps track of whether we need to delete the message queue. +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Future_Set.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Future_Set.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_HAS_THREADS */ +#endif /* ACE_FUTURE_SET_H */ + diff --git a/ace/Makefile b/ace/Makefile index be5d21618e4..b2a823ff308 100644 --- a/ace/Makefile +++ b/ace/Makefile @@ -193,6 +193,8 @@ TEMPLATE_FILES = \ Free_List \ Functor_T \ Future \ + Future_Node \ + Future_Set \ Hash_Map_Manager_T \ Hash_Cache_Map_Manager_T \ IOStream_T \ diff --git a/ace/Synch.h b/ace/Synch.h index d3d42b7f7af..b71b13a2575 100644 --- a/ace/Synch.h +++ b/ace/Synch.h @@ -260,7 +260,11 @@ public: // Note that <tv> is assumed to be in "absolute" rather than // "relative" time. The value of <tv> is updated upon return, i.e., // the caller gets the amount of time that has elapsed while waiting - // to acquire the semaphore. + // to acquire the semaphore. + // + // NOTE: Solaris threads do not support timed semaphores. + // Therefore, if you're running on Solaris you might want to + // consider using the POSIX pthreads wrapper instead. int tryacquire (void); // Conditionally decrement the semaphore if count is greater than 0 |