diff options
author | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-02-24 21:49:53 +0000 |
---|---|---|
committer | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-02-24 21:49:53 +0000 |
commit | 26ebc93891c3d2296b5c148f8ee5061fc1969bc2 (patch) | |
tree | 6c94197c4eddc29198c94b4989a2eac40364b84d /docs/tutorials/017 | |
parent | ee5d61a2cbcc1df234c7b0cd5eb0eb7cd9ab040e (diff) | |
download | ATCD-26ebc93891c3d2296b5c148f8ee5061fc1969bc2.tar.gz |
*** empty log message ***
Diffstat (limited to 'docs/tutorials/017')
-rw-r--r-- | docs/tutorials/017/Barrier_i.cpp | 51 | ||||
-rw-r--r-- | docs/tutorials/017/Barrier_i.h | 11 | ||||
-rw-r--r-- | docs/tutorials/017/Makefile | 8 | ||||
-rw-r--r-- | docs/tutorials/017/barrier.cpp | 56 | ||||
-rw-r--r-- | docs/tutorials/017/barrier2.cpp | 178 | ||||
-rw-r--r-- | docs/tutorials/017/combine.shar | 76 | ||||
-rw-r--r-- | docs/tutorials/017/page02.html | 56 | ||||
-rw-r--r-- | docs/tutorials/017/page03.html | 19 | ||||
-rw-r--r-- | docs/tutorials/017/page04.html | 51 | ||||
-rw-r--r-- | docs/tutorials/017/page05.html | 7 | ||||
-rw-r--r-- | docs/tutorials/017/page06.html | 203 |
11 files changed, 652 insertions, 64 deletions
diff --git a/docs/tutorials/017/Barrier_i.cpp b/docs/tutorials/017/Barrier_i.cpp index 308b44cc397..c8d0c6ba5ed 100644 --- a/docs/tutorials/017/Barrier_i.cpp +++ b/docs/tutorials/017/Barrier_i.cpp @@ -10,6 +10,7 @@ Barrier::Barrier(void) : threads_(0) ,barrier_(0) + ,new_barrier_(0) { owner_ = ACE_OS::thr_self(); } @@ -21,6 +22,11 @@ Barrier::~Barrier(void) delete barrier_; } +void Barrier::owner( ACE_thread_t _owner ) +{ + owner_ = _owner; +} + // Report on the number of threads. u_int Barrier::threads(void) { @@ -65,6 +71,25 @@ int Barrier::wait(void) return -1; } + // If the threads() mutator has been used, new_barrier_ will + // point to a new ACE_Barrier instance. We'll use a + // traditional double-check here to move that new object into + // place and cleanup the old one. + if( new_barrier_ ) + { + // mutex so that only one thread can do this part. + ACE_Guard<ACE_Mutex> mutex(barrier_mutex_); + + // We only want the first thread to plug in the new barrier... + if( new_barrier_ ) + { + // out with the old and in with the new. + delete barrier_; + barrier_ = new_barrier_; + new_barrier_ = 0; + } + } + return barrier_->wait(); } @@ -100,26 +125,30 @@ int Barrier::done(void) */ int Barrier::make_barrier( int _wait ) { - // Wait for and delete any existing barrier. + // Ensure we have a valid thread count. + if( ! threads_.value() ) + { + return -1; + } + + // If a barrier already exists, we'll arrange for it to be + // replaced through the wait() method above. if( barrier_ ) { + // Create the new barrier that wait() will install for us. + ACE_NEW_RETURN(new_barrier_,ACE_Barrier(threads_.value()),-1); + + // Wait for our siblings to synch before continuing if( _wait ) { barrier_->wait(); } - delete barrier_; } - - // Ensure we have a valid thread count. - if( ! threads_.value() ) + else { - return -1; + // Create the initial barrier. + ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); } - // Create the actual barrier. Note that we initialize it with - // threads_.value() to set its internal thread count. If the - // 'new' fails we will return -1 to the caller. - ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); - return 0; } diff --git a/docs/tutorials/017/Barrier_i.h b/docs/tutorials/017/Barrier_i.h index 8f96ac66e0f..5d86ce007af 100644 --- a/docs/tutorials/017/Barrier_i.h +++ b/docs/tutorials/017/Barrier_i.h @@ -38,7 +38,10 @@ public: // done() will invoke wait(). Before returning though, it will // delete the barrier_ pointer below to reclaim some memory. int done (void); - + + // Reset the owning thread of the barrier. + void owner( ACE_thread_t _owner ); + protected: // The number of threads we're synching ACE_Atomic_Op<ACE_Mutex, u_int> threads_; @@ -46,6 +49,12 @@ protected: // The ACE_Barrier that does all of the work ACE_Barrier *barrier_; + // If we mutate the number of threads we have to do some black magic + // to make sure there isn't a memory leak. These two member + // variables are a part of that magic. + ACE_Barrier *new_barrier_; + ACE_Mutex barrier_mutex_; + // The thread which created the Barrier in the first place. Only // this thread can change the threads_ value. ACE_thread_t owner_; diff --git a/docs/tutorials/017/Makefile b/docs/tutorials/017/Makefile index ac6e43168e1..32d54bbfc70 100644 --- a/docs/tutorials/017/Makefile +++ b/docs/tutorials/017/Makefile @@ -5,14 +5,14 @@ # Local macros #---------------------------------------------------------------------------- -BIN = barrier +BIN = barrier barrier2 FILES = Barrier_i BUILD = $(VBIN) -SRC = $(addsuffix .cpp,$(BIN)) -SRC += $(addsuffix .cpp,$(FILES)) +LSRC = $(addsuffix .cpp,$(BIN)) +SRC = $(addsuffix .cpp,$(FILES)) #---------------------------------------------------------------------------- # Include macros and targets @@ -62,7 +62,7 @@ HTML : # SHAR : # [ ! -f combine.shar ] || exit 1 - shar -T hdr bodies *.pre > combine.shar && rm -f hdr bodies *.pre + shar -T hdr bodies *.pre *.pst > combine.shar && rm -f hdr bodies *.pre *.pst UNSHAR : # sh combine.shar diff --git a/docs/tutorials/017/barrier.cpp b/docs/tutorials/017/barrier.cpp index 4386ad83ad8..eb9a5c14586 100644 --- a/docs/tutorials/017/barrier.cpp +++ b/docs/tutorials/017/barrier.cpp @@ -11,30 +11,63 @@ class Test : public ACE_Task<ACE_NULL_SYNCH> { public: - // Open the object with a few threads - int open(int _threads); + // Construct the object with a desired thread count + Test(int _threads); + + // Open/begin the test. As usual, we have to match the + // ACE_Task signature. + int open(void * _unused = 0); + + // Change the threads_ value for the next invocation of open() + void threads(int _threads); + + // Get the current threads_ value. + int threads(void); // Perform the test int svc(void); protected: + // How many threads the barrier will test. + int threads_; + // The Barrier object we'll use in our tests below Barrier barrier_; }; +/* Construct the object & initialize the threads value for open() to + use. +*/ +Test::Test(int _threads) + : threads_(_threads) +{ +} + /* As usual, our open() will create one or more threads where we'll do the interesting work. */ -int Test::open( int _threads ) +int Test::open(void * _unused) { + ACE_UNUSED_ARG(_unused); + // One thing about the barrier: You have to tell it how many // threads it will be synching. The threads() mutator on my // Barrier class lets you do that and hides the implementation // details at the same time. - barrier_.threads(_threads); + barrier_.threads(threads_); // Activate the tasks as usual... - return this->activate(THR_NEW_LWP, _threads); + return this->activate(THR_NEW_LWP, threads_); +} + +void Test::threads(int _threads) +{ + threads_ = _threads; +} + +int Test::threads(void) +{ + return threads_; } /* svc() will execute in each thread & do a few things with the @@ -88,7 +121,7 @@ int Test::svc(void) // actually invokes wait() but before returning here, it will // clean up a few resources. The goal is to prevent carrying // around objects you don't need. - if( barrier_.wait() == -1 ) + if( barrier_.done() == -1 ) { ACE_DEBUG ((LM_INFO, "(%P|%t|%T)\tbarrier_.done() failed!\n")); return 0; @@ -113,16 +146,17 @@ int Test::svc(void) */ int main(int, char**) { - // Create the test object - Test test; + // Create the test object with 10 threads + Test test(10); - // and open it with 10 threads. - test.open(10); + // and open it to test the barrier. + test.open(); // Now wait for them all to exit. test.wait(); // Re-open the Test object with just 5 threads - test.open(5); + test.threads(5); + test.open(); // and wait for them to complete also. test.wait(); diff --git a/docs/tutorials/017/barrier2.cpp b/docs/tutorials/017/barrier2.cpp new file mode 100644 index 00000000000..4fce4e07e42 --- /dev/null +++ b/docs/tutorials/017/barrier2.cpp @@ -0,0 +1,178 @@ + +// $Id$ + +#include "Barrier_i.h" +#include "ace/Task.h" + +/* We'll use a simple Task<> derivative to test our new Barrier + object. +*/ +class Test : public ACE_Task<ACE_NULL_SYNCH> +{ +public: + + // Construct the object with a desired thread count + Test(int _threads); + + // Open/begin the test. As usual, we have to match the + // ACE_Task signature. + int open(void * _unused = 0); + + // Change the threads_ value for the next invocation of open() + void threads(int _threads); + + // Get the current threads_ value. + int threads(void); + + // Perform the test + int svc(void); + +protected: + // How many threads the barrier will test. + u_int threads_; + + // The Barrier object we'll use in our tests below + Barrier barrier_; + + // This lets us pick one (eg -- the first) thread as the + // "controller" for our little test... + ACE_Atomic_Op<ACE_Mutex,u_int> tcount_; +}; + +/* Construct the object & initialize the threads value for open() to + use. +*/ +Test::Test(int _threads) + : threads_(_threads), tcount_(0) +{ +} + +/* As usual, our open() will create one or more threads where we'll do + the interesting work. +*/ +int Test::open(void * _unused) +{ + ACE_UNUSED_ARG(_unused); + + // One thing about the barrier: You have to tell it how many + // threads it will be synching. The threads() mutator on my + // Barrier class lets you do that and hides the implementation + // details at the same time. + barrier_.threads(threads_); + + // Activate the tasks as usual... + return this->activate(THR_NEW_LWP, threads_, 1); +} + +void Test::threads(int _threads) +{ + threads_ = _threads; +} + +int Test::threads(void) +{ + return threads_; +} + +/* svc() will execute in each thread & do a few things with the + Barrier we have. + */ +int Test::svc(void) +{ + // Say hello to everyone first. + ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Created\n" )); + + // Increment and save the "tcount" value. We'll use it in + // just a moment... + int me = ++tcount_; + + // Wait for all initial threads to get to this point before we + // go any further. This is standard barrier usage... + barrier_.wait(); + + // Setup our random number generator. + ACE_Time_Value now(ACE_OS::gettimeofday()); + ACE_RANDR_TYPE seed = now.usec(); + ACE_OS::srand(seed); + int delay; + + // We'll arbitrarily choose the first activated thread to be + // the controller. After it sleeps a few seconds, it will add + // five threads. + if( me == 1 ) + { + // Sleep from 1 to 10 seconds so that some of the other + // threads will be into their for() loop. + delay = ACE_OS::rand_r(seed)%10; + ACE_OS::sleep(abs(delay)+1); + + // Make ourselves the barrier owner so that we can change + // the number of threads. This should be done with care... + barrier_.owner( ACE_OS::thr_self() ); + + // Add 5 threads to the barrier and then activate() to + // make them real. Notice the third parameter to + // activate(). Without this parameter, the threads won't + // be created. + if( barrier_.threads(threads_+5) == 0 ) + { + this->activate(THR_NEW_LWP,5,1); + } + } + + // This for() loop represents an "infinite" work loop in an + // application. The theory is that the threads are dividing up + // some work but need to "recalibrate" if more threads are + // added. I'll just do five iterations so that the test + // doesn't run forever. + int i; + for( i = 0 ; i < 5 ; ++i ) + { + // The sleep() represents time doing work. + delay = ACE_OS::rand_r(seed)%7; + ACE_OS::sleep(abs(delay)+1); + + ACE_DEBUG(( LM_INFO, "(%P|%t|%T)\tThread %.2d of %.2d iteration %.2d\n", me, threads_, i )); + + // If the local threads_ variable doesn't match the number + // in the barrier, then the controller must have changed + // the thread count. We'll wait() for everyone and then + // recalibrate ourselves before continuing. + if( this->threads_ != barrier_.threads() ) + { + ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Waiting for thread count to increase to %d from %d\n", + barrier_.threads(), this->threads_ )); + + // Wait for all our sibling threads... + barrier_.wait(); + + // Set our local variable so that we don't come here again. + this->threads_ = barrier_.threads(); + + // Recalibration can be anything you want. At this + // point, we know that all of the threads are synch'd + // and ready to go. + } + } + + // Re-synch all of the threads before they exit. This isn't + // really necessary but I like to do it. + barrier_.done(); + + return(0); +} + +/* Our test application... + */ +int main(int, char**) +{ + // Create the test object with 5 threads + Test test(5); + + // and open it to test the barrier. + test.open(); + // Now wait for them all to exit. + test.wait(); + + return(0); +} diff --git a/docs/tutorials/017/combine.shar b/docs/tutorials/017/combine.shar index ff6f7ef7d2d..9595308932d 100644 --- a/docs/tutorials/017/combine.shar +++ b/docs/tutorials/017/combine.shar @@ -3,7 +3,7 @@ # To extract the files from this archive, save it to some FILE, remove # everything before the `!/bin/sh' line above, then type `sh FILE'. # -# Made on 1999-02-14 14:25 EST by <jcej@chiroptera.tragus.org>. +# Made on 1999-02-24 16:56 EST by <jcej@chiroptera.tragus.org>. # Source directory was `/var/home/jcej/projects/ACE_wrappers/docs/tutorials/017'. # # Existing files will *not* be overwritten unless `-c' is specified. @@ -12,12 +12,14 @@ # length mode name # ------ ---------- ------------------------------------------ # 422 -rw-rw-r-- hdr -# 45 -rw-rw-r-- bodies +# 64 -rw-rw-r-- bodies # 1397 -rw-rw-r-- page01.pre # 420 -rw-rw-r-- page02.pre # 739 -rw-rw-r-- page03.pre # 479 -rw-rw-r-- page04.pre # 375 -rw-rw-r-- page05.pre +# 373 -rw-rw-r-- page06.pre +# 216 -rw-rw-r-- page05.pst # save_IFS="${IFS}" IFS="${IFS}:" @@ -64,7 +66,7 @@ else fi rm -f 1231235999 $$.touch # -if mkdir _sh32003; then +if mkdir _sh29953; then $echo 'x -' 'creating lock directory' else $echo 'failed to create lock directory' @@ -116,20 +118,22 @@ PAGE=2 barrier.cpp Barrier_i.h Barrier_i.cpp +PAGE=6 +barrier2.cpp SHAR_EOF - $shar_touch -am 1110144198 'bodies' && + $shar_touch -am 0224165499 'bodies' && chmod 0664 'bodies' || $echo 'restore of' 'bodies' 'failed' if ( md5sum --help 2>&1 | grep 'sage: md5sum \[' ) >/dev/null 2>&1 \ && ( md5sum --version 2>&1 | grep -v 'textutils 1.12' ) >/dev/null; then md5sum -c << SHAR_EOF >/dev/null 2>&1 \ || $echo 'bodies:' 'MD5 check failed' -4924294a77d6ba78dcf667e92c341b4f bodies +b6fd04983b241794a9438df2ae77055c bodies SHAR_EOF else shar_count="`LC_ALL= LC_CTYPE= LANG= wc -c < 'bodies'`" - test 45 -eq "$shar_count" || - $echo 'bodies:' 'original size' '45,' 'current size' "$shar_count!" + test 64 -eq "$shar_count" || + $echo 'bodies:' 'original size' '64,' 'current size' "$shar_count!" fi fi # ============= page01.pre ============== @@ -316,5 +320,61 @@ SHAR_EOF $echo 'page05.pre:' 'original size' '375,' 'current size' "$shar_count!" fi fi -rm -fr _sh32003 +# ============= page06.pre ============== +if test -f 'page06.pre' && test "$first_param" != -c; then + $echo 'x -' SKIPPING 'page06.pre' '(file already exists)' +else + $echo 'x -' extracting 'page06.pre' '(text)' + sed 's/^X//' << 'SHAR_EOF' > 'page06.pre' && +I could have included this in the first Test object of the tutorial +but that may have complicated things a bit. What we're doing here is +recognizing when the "owner" thread adds more threads to the pool. +When we notice that, we use the barrier to wait until everything +stabilizes and then we recalibrate and move on. +<P> +The source is <A HREF="barrier2.cpp">here</A>. +<HR> +SHAR_EOF + $shar_touch -am 0224165499 'page06.pre' && + chmod 0664 'page06.pre' || + $echo 'restore of' 'page06.pre' 'failed' + if ( md5sum --help 2>&1 | grep 'sage: md5sum \[' ) >/dev/null 2>&1 \ + && ( md5sum --version 2>&1 | grep -v 'textutils 1.12' ) >/dev/null; then + md5sum -c << SHAR_EOF >/dev/null 2>&1 \ + || $echo 'page06.pre:' 'MD5 check failed' +ad87cd9f57af4c9b2c91cb32b484c0d1 page06.pre +SHAR_EOF + else + shar_count="`LC_ALL= LC_CTYPE= LANG= wc -c < 'page06.pre'`" + test 373 -eq "$shar_count" || + $echo 'page06.pre:' 'original size' '373,' 'current size' "$shar_count!" + fi +fi +# ============= page05.pst ============== +if test -f 'page05.pst' && test "$first_param" != -c; then + $echo 'x -' SKIPPING 'page05.pst' '(file already exists)' +else + $echo 'x -' extracting 'page05.pst' '(text)' + sed 's/^X//' << 'SHAR_EOF' > 'page05.pst' && +<HR> +Before we call it a wrap though, there's one more thing I want to show +you. Remember the comments around Barrier::threads()? On the next +page, I'll show you how to synch up when the number of threads changes. +SHAR_EOF + $shar_touch -am 0224165199 'page05.pst' && + chmod 0664 'page05.pst' || + $echo 'restore of' 'page05.pst' 'failed' + if ( md5sum --help 2>&1 | grep 'sage: md5sum \[' ) >/dev/null 2>&1 \ + && ( md5sum --version 2>&1 | grep -v 'textutils 1.12' ) >/dev/null; then + md5sum -c << SHAR_EOF >/dev/null 2>&1 \ + || $echo 'page05.pst:' 'MD5 check failed' +7ce8bbac0d211a3616b5052236e4983b page05.pst +SHAR_EOF + else + shar_count="`LC_ALL= LC_CTYPE= LANG= wc -c < 'page05.pst'`" + test 216 -eq "$shar_count" || + $echo 'page05.pst:' 'original size' '216,' 'current size' "$shar_count!" + fi +fi +rm -fr _sh29953 exit 0 diff --git a/docs/tutorials/017/page02.html b/docs/tutorials/017/page02.html index 5b7417bc4a9..b8762d9e392 100644 --- a/docs/tutorials/017/page02.html +++ b/docs/tutorials/017/page02.html @@ -35,30 +35,63 @@ class Test : public ACE_Task<ACE_NULL_SYNCH> { public: - <font color=red>// Open the object with a few threads</font> - int open(int _threads); + <font color=red>// Construct the object with a desired thread count</font> + Test(int _threads); + + <font color=red>// Open/begin the test. As usual, we have to match the</font> + <font color=red>// ACE_Task signature.</font> + int open(void * _unused = 0); + + <font color=red>// Change the threads_ value for the next invocation of open()</font> + void threads(int _threads); + + <font color=red>// Get the current threads_ value.</font> + int threads(void); <font color=red>// Perform the test</font> int svc(void); protected: + <font color=red>// How many threads the barrier will test.</font> + int threads_; + <font color=red>// The Barrier object we'll use in our tests below</font> Barrier barrier_; }; +<font color=red>/* Construct the object & initialize the threads value for open() to + use. +*/</font> +<font color=#008888>Test::Test</font>(int _threads) + : threads_(_threads) +{ +} + <font color=red>/* As usual, our open() will create one or more threads where we'll do the interesting work. */</font> -int <font color=#008888>Test::open</font>( int _threads ) +int <font color=#008888>Test::open</font>(void * _unused) { + ACE_UNUSED_ARG(_unused); + <font color=red>// One thing about the barrier: You have to tell it how many</font> <font color=red>// threads it will be synching. The threads() mutator on my</font> <font color=red>// Barrier class lets you do that and hides the implementation </font> <font color=red>// details at the same time.</font> - barrier_.threads(_threads); + barrier_.threads(threads_); <font color=red>// Activate the tasks as usual...</font> - return this->activate(THR_NEW_LWP, _threads); + return this->activate(THR_NEW_LWP, threads_); +} + +void <font color=#008888>Test::threads</font>(int _threads) +{ + threads_ = _threads; +} + +int <font color=#008888>Test::threads</font>(void) +{ + return threads_; } <font color=red>/* svc() will execute in each thread & do a few things with the @@ -112,7 +145,7 @@ int <font color=#008888>Test::svc</font>(void) <font color=red>// actually invokes wait() but before returning here, it will </font> <font color=red>// clean up a few resources. The goal is to prevent carrying</font> <font color=red>// around objects you don't need.</font> - if( barrier_.wait() == -1 ) + if( barrier_.done() == -1 ) { ACE_DEBUG ((LM_INFO, "<font color=green>(%P|%t|%T)\tbarrier_.done() failed!\n</font>")); return 0; @@ -137,16 +170,17 @@ int <font color=#008888>Test::svc</font>(void) */</font> int main(int, char**) { - <font color=red>// Create the test object</font> - Test test; + <font color=red>// Create the test object with 10 threads</font> + Test test(10); - <font color=red>// and open it with 10 threads.</font> - test.open(10); + <font color=red>// and open it to test the barrier.</font> + test.open(); <font color=red>// Now wait for them all to exit.</font> test.wait(); <font color=red>// Re-open the Test object with just 5 threads</font> - test.open(5); + test.threads(5); + test.open(); <font color=red>// and wait for them to complete also.</font> test.wait(); diff --git a/docs/tutorials/017/page03.html b/docs/tutorials/017/page03.html index 904fc58701f..6350fb1ca1f 100644 --- a/docs/tutorials/017/page03.html +++ b/docs/tutorials/017/page03.html @@ -34,10 +34,10 @@ the Barrier object almost as a "synchronization guard". <font color=blue>#include</font> "<font color=green>ace/Synch.h</font>" <font color=red>/* Barrier is a simple wrapper for the ACE_Barrier synchronization - class. The ACE_Barrier is already pretty easy to use but I thought + class. The ACE_Barrier is already pretty easy to use but I thought I'd wrap it up to create just a bit more abstraction at the - application level. - */</font> + application level. */</font> + class Barrier { public: @@ -66,14 +66,23 @@ public: <font color=red>// done() will invoke wait(). Before returning though, it will</font> <font color=red>// delete the barrier_ pointer below to reclaim some memory.</font> int done (void); - + + <font color=red>// Reset the owning thread of the barrier.</font> + void owner( ACE_thread_t _owner ); + protected: <font color=red>// The number of threads we're synching</font> - ACE_Atomic_Op<ACE_Mutex,u_int> threads_; + ACE_Atomic_Op<ACE_Mutex, u_int> threads_; <font color=red>// The ACE_Barrier that does all of the work</font> ACE_Barrier *barrier_; + <font color=red>// If we mutate the number of threads we have to do some black magic </font> + <font color=red>// to make sure there isn't a memory leak. These two member</font> + <font color=red>// variables are a part of that magic.</font> + ACE_Barrier *new_barrier_; + ACE_Mutex barrier_mutex_; + <font color=red>// The thread which created the Barrier in the first place. Only</font> <font color=red>// this thread can change the threads_ value.</font> ACE_thread_t owner_; diff --git a/docs/tutorials/017/page04.html b/docs/tutorials/017/page04.html index 7032f4c2d3a..88ff60be565 100644 --- a/docs/tutorials/017/page04.html +++ b/docs/tutorials/017/page04.html @@ -33,6 +33,7 @@ locking that is just a bit more than what I wanted to present here. <font color=#008888>Barrier::Barrier</font>(void) : threads_(0) ,barrier_(0) + ,new_barrier_(0) { owner_ = <font color=#008888>ACE_OS::thr_self</font>(); } @@ -44,6 +45,11 @@ locking that is just a bit more than what I wanted to present here. delete barrier_; } +void <font color=#008888>Barrier::owner</font>( ACE_thread_t _owner ) +{ + owner_ = _owner; +} + <font color=red>// Report on the number of threads.</font> u_int <font color=#008888>Barrier::threads</font>(void) { @@ -88,6 +94,25 @@ int <font color=#008888>Barrier::wait</font>(void) return -1; } + <font color=red>// If the threads() mutator has been used, new_barrier_ will</font> + <font color=red>// point to a new ACE_Barrier instance. We'll use a</font> + <font color=red>// traditional double-check here to move that new object into</font> + <font color=red>// place and cleanup the old one.</font> + if( new_barrier_ ) + { + <font color=red>// mutex so that only one thread can do this part.</font> + ACE_Guard<ACE_Mutex> mutex(barrier_mutex_); + + <font color=red>// We only want the first thread to plug in the new barrier...</font> + if( new_barrier_ ) + { + <font color=red>// out with the old and in with the new.</font> + delete barrier_; + barrier_ = new_barrier_; + new_barrier_ = 0; + } + } + return barrier_->wait(); } @@ -123,27 +148,31 @@ int <font color=#008888>Barrier::done</font>(void) */</font> int <font color=#008888>Barrier::make_barrier</font>( int _wait ) { - <font color=red>// Wait for and delete any existing barrier.</font> + <font color=red>// Ensure we have a valid thread count.</font> + if( ! threads_.value() ) + { + return -1; + } + + <font color=red>// If a barrier already exists, we'll arrange for it to be</font> + <font color=red>// replaced through the wait() method above.</font> if( barrier_ ) { + <font color=red>// Create the new barrier that wait() will install for us.</font> + ACE_NEW_RETURN(new_barrier_,ACE_Barrier(threads_.value()),-1); + + <font color=red>// Wait for our siblings to synch before continuing</font> if( _wait ) { barrier_->wait(); } - delete barrier_; } - - <font color=red>// Ensure we have a valid thread count.</font> - if( ! threads_.value() ) + else { - return -1; + <font color=red>// Create the initial barrier.</font> + ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); } - <font color=red>// Create the actual barrier. Note that we initialize it with </font> - <font color=red>// threads_.value() to set its internal thread count. If the</font> - <font color=red>// 'new' fails we will return -1 to the caller.</font> - ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); - return 0; } </PRE> diff --git a/docs/tutorials/017/page05.html b/docs/tutorials/017/page05.html index f0fd213c487..c3aeb61918f 100644 --- a/docs/tutorials/017/page05.html +++ b/docs/tutorials/017/page05.html @@ -22,5 +22,8 @@ enhancements will gladly be integrated into the Tutorial. <LI><A HREF="Barrier_i.h">Barrier_i.h</A> <LI><A HREF="Barrier_i.cpp">Barrier_i.cpp</A> </UL> -<P><HR WIDTH="100%"> -<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] </CENTER> +<HR> +Before we call it a wrap though, there's one more thing I want to show +you. Remember the comments around Barrier::threads()? On the next +page, I'll show you how to synch up when the number of threads changes.<P><HR WIDTH="100%"> +<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] [<A HREF="page06.html">Continue This Tutorial</A>]</CENTER> diff --git a/docs/tutorials/017/page06.html b/docs/tutorials/017/page06.html new file mode 100644 index 00000000000..1d80cfbd4d4 --- /dev/null +++ b/docs/tutorials/017/page06.html @@ -0,0 +1,203 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 017</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 017</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Using the ACE_Barrier synch object</FONT></B></CENTER> + +<P> +<HR WIDTH="100%"> +I could have included this in the first Test object of the tutorial +but that may have complicated things a bit. What we're doing here is +recognizing when the "owner" thread adds more threads to the pool. +When we notice that, we use the barrier to wait until everything +stabilizes and then we recalibrate and move on. +<P> +The source is <A HREF="barrier2.cpp">here</A>. +<HR><PRE> + +<font color=red>// $Id$</font> + +<font color=blue>#include</font> "<font color=green>Barrier_i.h</font>" +<font color=blue>#include</font> "<font color=green>ace/Task.h</font>" + +<font color=red>/* We'll use a simple Task<> derivative to test our new Barrier + object. +*/</font> +class Test : public ACE_Task<ACE_NULL_SYNCH> +{ +public: + + <font color=red>// Construct the object with a desired thread count</font> + Test(int _threads); + + <font color=red>// Open/begin the test. As usual, we have to match the</font> + <font color=red>// ACE_Task signature.</font> + int open(void * _unused = 0); + + <font color=red>// Change the threads_ value for the next invocation of open()</font> + void threads(int _threads); + + <font color=red>// Get the current threads_ value.</font> + int threads(void); + + <font color=red>// Perform the test</font> + int svc(void); + +protected: + <font color=red>// How many threads the barrier will test.</font> + u_int threads_; + + <font color=red>// The Barrier object we'll use in our tests below</font> + Barrier barrier_; + + <font color=red>// This lets us pick one (eg -- the first) thread as the</font> + <font color=red>// "<font color=green>controller</font>" for our little test...</font> + ACE_Atomic_Op<ACE_Mutex,u_int> tcount_; +}; + +<font color=red>/* Construct the object & initialize the threads value for open() to + use. +*/</font> +<font color=#008888>Test::Test</font>(int _threads) + : threads_(_threads), tcount_(0) +{ +} + +<font color=red>/* As usual, our open() will create one or more threads where we'll do + the interesting work. +*/</font> +int <font color=#008888>Test::open</font>(void * _unused) +{ + ACE_UNUSED_ARG(_unused); + + <font color=red>// One thing about the barrier: You have to tell it how many</font> + <font color=red>// threads it will be synching. The threads() mutator on my</font> + <font color=red>// Barrier class lets you do that and hides the implementation </font> + <font color=red>// details at the same time.</font> + barrier_.threads(threads_); + + <font color=red>// Activate the tasks as usual...</font> + return this->activate(THR_NEW_LWP, threads_, 1); +} + +void <font color=#008888>Test::threads</font>(int _threads) +{ + threads_ = _threads; +} + +int <font color=#008888>Test::threads</font>(void) +{ + return threads_; +} + +<font color=red>/* svc() will execute in each thread & do a few things with the + Barrier we have. + */</font> +int <font color=#008888>Test::svc</font>(void) +{ + <font color=red>// Say hello to everyone first.</font> + ACE_DEBUG(( LM_INFO, "<font color=green>(%P|%t|%T) Created\n</font>" )); + + <font color=red>// Increment and save the "<font color=green>tcount</font>" value. We'll use it in</font> + <font color=red>// just a moment...</font> + int me = ++tcount_; + + <font color=red>// Wait for all initial threads to get to this point before we</font> + <font color=red>// go any further. This is standard barrier usage...</font> + barrier_.wait(); + + <font color=red>// Setup our random number generator.</font> + ACE_Time_Value now(<font color=#008888>ACE_OS::gettimeofday</font>()); + ACE_RANDR_TYPE seed = now.usec(); + <font color=#008888>ACE_OS::srand</font>(seed); + int delay; + + <font color=red>// We'll arbitrarily choose the first activated thread to be</font> + <font color=red>// the controller. After it sleeps a few seconds, it will add </font> + <font color=red>// five threads.</font> + if( me == 1 ) + { + <font color=red>// Sleep from 1 to 10 seconds so that some of the other</font> + <font color=red>// threads will be into their for() loop.</font> + delay = <font color=#008888>ACE_OS::rand_r</font>(seed)%10; + <font color=#008888>ACE_OS::sleep</font>(abs(delay)+1); + + <font color=red>// Make ourselves the barrier owner so that we can change</font> + <font color=red>// the number of threads. This should be done with care...</font> + barrier_.owner( <font color=#008888>ACE_OS::thr_self</font>() ); + + <font color=red>// Add 5 threads to the barrier and then activate() to</font> + <font color=red>// make them real. Notice the third parameter to</font> + <font color=red>// activate(). Without this parameter, the threads won't</font> + <font color=red>// be created.</font> + if( barrier_.threads(threads_+5) == 0 ) + { + this->activate(THR_NEW_LWP,5,1); + } + } + + <font color=red>// This for() loop represents an "<font color=green>infinite</font>" work loop in an</font> + <font color=red>// application. The theory is that the threads are dividing up </font> + <font color=red>// some work but need to "<font color=green>recalibrate</font>" if more threads are</font> + <font color=red>// added. I'll just do five iterations so that the test</font> + <font color=red>// doesn't run forever.</font> + int i; + for( i = 0 ; i < 5 ; ++i ) + { + <font color=red>// The sleep() represents time doing work.</font> + delay = <font color=#008888>ACE_OS::rand_r</font>(seed)%7; + <font color=#008888>ACE_OS::sleep</font>(abs(delay)+1); + + ACE_DEBUG(( LM_INFO, "<font color=green>(%P|%t|%T)\tThread %.2d of %.2d iteration %.2d\n</font>", me, threads_, i )); + + <font color=red>// If the local threads_ variable doesn't match the number </font> + <font color=red>// in the barrier, then the controller must have changed</font> + <font color=red>// the thread count. We'll wait() for everyone and then</font> + <font color=red>// recalibrate ourselves before continuing.</font> + if( this->threads_ != barrier_.threads() ) + { + ACE_DEBUG(( LM_INFO, "<font color=green>(%P|%t|%T) Waiting for thread count to increase to %d from %d\n</font>", + barrier_.threads(), this->threads_ )); + + <font color=red>// Wait for all our sibling threads...</font> + barrier_.wait(); + + <font color=red>// Set our local variable so that we don't come here again.</font> + this->threads_ = barrier_.threads(); + + <font color=red>// Recalibration can be anything you want. At this</font> + <font color=red>// point, we know that all of the threads are synch'd</font> + <font color=red>// and ready to go.</font> + } + } + + <font color=red>// Re-synch all of the threads before they exit. This isn't</font> + <font color=red>// really necessary but I like to do it.</font> + barrier_.done(); + + return(0); +} + +<font color=red>/* Our test application... + */</font> +int main(int, char**) +{ + <font color=red>// Create the test object with 5 threads</font> + Test test(5); + + <font color=red>// and open it to test the barrier.</font> + test.open(); + <font color=red>// Now wait for them all to exit.</font> + test.wait(); + + return(0); +} +</PRE> +<P><HR WIDTH="100%"> +<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] </CENTER> |