From 26ebc93891c3d2296b5c148f8ee5061fc1969bc2 Mon Sep 17 00:00:00 2001 From: jcej Date: Wed, 24 Feb 1999 21:49:53 +0000 Subject: *** empty log message *** --- docs/tutorials/017/Barrier_i.cpp | 51 +++++++--- docs/tutorials/017/Barrier_i.h | 11 ++- docs/tutorials/017/Makefile | 8 +- docs/tutorials/017/barrier.cpp | 56 ++++++++--- docs/tutorials/017/barrier2.cpp | 178 ++++++++++++++++++++++++++++++++++ docs/tutorials/017/combine.shar | 76 +++++++++++++-- docs/tutorials/017/page02.html | 56 ++++++++--- docs/tutorials/017/page03.html | 19 +++- docs/tutorials/017/page04.html | 51 +++++++--- docs/tutorials/017/page05.html | 7 +- docs/tutorials/017/page06.html | 203 +++++++++++++++++++++++++++++++++++++++ 11 files changed, 652 insertions(+), 64 deletions(-) create mode 100644 docs/tutorials/017/barrier2.cpp create mode 100644 docs/tutorials/017/page06.html (limited to 'docs/tutorials') diff --git a/docs/tutorials/017/Barrier_i.cpp b/docs/tutorials/017/Barrier_i.cpp index 308b44cc397..c8d0c6ba5ed 100644 --- a/docs/tutorials/017/Barrier_i.cpp +++ b/docs/tutorials/017/Barrier_i.cpp @@ -10,6 +10,7 @@ Barrier::Barrier(void) : threads_(0) ,barrier_(0) + ,new_barrier_(0) { owner_ = ACE_OS::thr_self(); } @@ -21,6 +22,11 @@ Barrier::~Barrier(void) delete barrier_; } +void Barrier::owner( ACE_thread_t _owner ) +{ + owner_ = _owner; +} + // Report on the number of threads. u_int Barrier::threads(void) { @@ -65,6 +71,25 @@ int Barrier::wait(void) return -1; } + // If the threads() mutator has been used, new_barrier_ will + // point to a new ACE_Barrier instance. We'll use a + // traditional double-check here to move that new object into + // place and cleanup the old one. + if( new_barrier_ ) + { + // mutex so that only one thread can do this part. + ACE_Guard mutex(barrier_mutex_); + + // We only want the first thread to plug in the new barrier... + if( new_barrier_ ) + { + // out with the old and in with the new. + delete barrier_; + barrier_ = new_barrier_; + new_barrier_ = 0; + } + } + return barrier_->wait(); } @@ -100,26 +125,30 @@ int Barrier::done(void) */ int Barrier::make_barrier( int _wait ) { - // Wait for and delete any existing barrier. + // Ensure we have a valid thread count. + if( ! threads_.value() ) + { + return -1; + } + + // If a barrier already exists, we'll arrange for it to be + // replaced through the wait() method above. if( barrier_ ) { + // Create the new barrier that wait() will install for us. + ACE_NEW_RETURN(new_barrier_,ACE_Barrier(threads_.value()),-1); + + // Wait for our siblings to synch before continuing if( _wait ) { barrier_->wait(); } - delete barrier_; } - - // Ensure we have a valid thread count. - if( ! threads_.value() ) + else { - return -1; + // Create the initial barrier. + ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); } - // Create the actual barrier. Note that we initialize it with - // threads_.value() to set its internal thread count. If the - // 'new' fails we will return -1 to the caller. - ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); - return 0; } diff --git a/docs/tutorials/017/Barrier_i.h b/docs/tutorials/017/Barrier_i.h index 8f96ac66e0f..5d86ce007af 100644 --- a/docs/tutorials/017/Barrier_i.h +++ b/docs/tutorials/017/Barrier_i.h @@ -38,7 +38,10 @@ public: // done() will invoke wait(). Before returning though, it will // delete the barrier_ pointer below to reclaim some memory. int done (void); - + + // Reset the owning thread of the barrier. + void owner( ACE_thread_t _owner ); + protected: // The number of threads we're synching ACE_Atomic_Op threads_; @@ -46,6 +49,12 @@ protected: // The ACE_Barrier that does all of the work ACE_Barrier *barrier_; + // If we mutate the number of threads we have to do some black magic + // to make sure there isn't a memory leak. These two member + // variables are a part of that magic. + ACE_Barrier *new_barrier_; + ACE_Mutex barrier_mutex_; + // The thread which created the Barrier in the first place. Only // this thread can change the threads_ value. ACE_thread_t owner_; diff --git a/docs/tutorials/017/Makefile b/docs/tutorials/017/Makefile index ac6e43168e1..32d54bbfc70 100644 --- a/docs/tutorials/017/Makefile +++ b/docs/tutorials/017/Makefile @@ -5,14 +5,14 @@ # Local macros #---------------------------------------------------------------------------- -BIN = barrier +BIN = barrier barrier2 FILES = Barrier_i BUILD = $(VBIN) -SRC = $(addsuffix .cpp,$(BIN)) -SRC += $(addsuffix .cpp,$(FILES)) +LSRC = $(addsuffix .cpp,$(BIN)) +SRC = $(addsuffix .cpp,$(FILES)) #---------------------------------------------------------------------------- # Include macros and targets @@ -62,7 +62,7 @@ HTML : # SHAR : # [ ! -f combine.shar ] || exit 1 - shar -T hdr bodies *.pre > combine.shar && rm -f hdr bodies *.pre + shar -T hdr bodies *.pre *.pst > combine.shar && rm -f hdr bodies *.pre *.pst UNSHAR : # sh combine.shar diff --git a/docs/tutorials/017/barrier.cpp b/docs/tutorials/017/barrier.cpp index 4386ad83ad8..eb9a5c14586 100644 --- a/docs/tutorials/017/barrier.cpp +++ b/docs/tutorials/017/barrier.cpp @@ -11,30 +11,63 @@ class Test : public ACE_Task { public: - // Open the object with a few threads - int open(int _threads); + // Construct the object with a desired thread count + Test(int _threads); + + // Open/begin the test. As usual, we have to match the + // ACE_Task signature. + int open(void * _unused = 0); + + // Change the threads_ value for the next invocation of open() + void threads(int _threads); + + // Get the current threads_ value. + int threads(void); // Perform the test int svc(void); protected: + // How many threads the barrier will test. + int threads_; + // The Barrier object we'll use in our tests below Barrier barrier_; }; +/* Construct the object & initialize the threads value for open() to + use. +*/ +Test::Test(int _threads) + : threads_(_threads) +{ +} + /* As usual, our open() will create one or more threads where we'll do the interesting work. */ -int Test::open( int _threads ) +int Test::open(void * _unused) { + ACE_UNUSED_ARG(_unused); + // One thing about the barrier: You have to tell it how many // threads it will be synching. The threads() mutator on my // Barrier class lets you do that and hides the implementation // details at the same time. - barrier_.threads(_threads); + barrier_.threads(threads_); // Activate the tasks as usual... - return this->activate(THR_NEW_LWP, _threads); + return this->activate(THR_NEW_LWP, threads_); +} + +void Test::threads(int _threads) +{ + threads_ = _threads; +} + +int Test::threads(void) +{ + return threads_; } /* svc() will execute in each thread & do a few things with the @@ -88,7 +121,7 @@ int Test::svc(void) // actually invokes wait() but before returning here, it will // clean up a few resources. The goal is to prevent carrying // around objects you don't need. - if( barrier_.wait() == -1 ) + if( barrier_.done() == -1 ) { ACE_DEBUG ((LM_INFO, "(%P|%t|%T)\tbarrier_.done() failed!\n")); return 0; @@ -113,16 +146,17 @@ int Test::svc(void) */ int main(int, char**) { - // Create the test object - Test test; + // Create the test object with 10 threads + Test test(10); - // and open it with 10 threads. - test.open(10); + // and open it to test the barrier. + test.open(); // Now wait for them all to exit. test.wait(); // Re-open the Test object with just 5 threads - test.open(5); + test.threads(5); + test.open(); // and wait for them to complete also. test.wait(); diff --git a/docs/tutorials/017/barrier2.cpp b/docs/tutorials/017/barrier2.cpp new file mode 100644 index 00000000000..4fce4e07e42 --- /dev/null +++ b/docs/tutorials/017/barrier2.cpp @@ -0,0 +1,178 @@ + +// $Id$ + +#include "Barrier_i.h" +#include "ace/Task.h" + +/* We'll use a simple Task<> derivative to test our new Barrier + object. +*/ +class Test : public ACE_Task +{ +public: + + // Construct the object with a desired thread count + Test(int _threads); + + // Open/begin the test. As usual, we have to match the + // ACE_Task signature. + int open(void * _unused = 0); + + // Change the threads_ value for the next invocation of open() + void threads(int _threads); + + // Get the current threads_ value. + int threads(void); + + // Perform the test + int svc(void); + +protected: + // How many threads the barrier will test. + u_int threads_; + + // The Barrier object we'll use in our tests below + Barrier barrier_; + + // This lets us pick one (eg -- the first) thread as the + // "controller" for our little test... + ACE_Atomic_Op tcount_; +}; + +/* Construct the object & initialize the threads value for open() to + use. +*/ +Test::Test(int _threads) + : threads_(_threads), tcount_(0) +{ +} + +/* As usual, our open() will create one or more threads where we'll do + the interesting work. +*/ +int Test::open(void * _unused) +{ + ACE_UNUSED_ARG(_unused); + + // One thing about the barrier: You have to tell it how many + // threads it will be synching. The threads() mutator on my + // Barrier class lets you do that and hides the implementation + // details at the same time. + barrier_.threads(threads_); + + // Activate the tasks as usual... + return this->activate(THR_NEW_LWP, threads_, 1); +} + +void Test::threads(int _threads) +{ + threads_ = _threads; +} + +int Test::threads(void) +{ + return threads_; +} + +/* svc() will execute in each thread & do a few things with the + Barrier we have. + */ +int Test::svc(void) +{ + // Say hello to everyone first. + ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Created\n" )); + + // Increment and save the "tcount" value. We'll use it in + // just a moment... + int me = ++tcount_; + + // Wait for all initial threads to get to this point before we + // go any further. This is standard barrier usage... + barrier_.wait(); + + // Setup our random number generator. + ACE_Time_Value now(ACE_OS::gettimeofday()); + ACE_RANDR_TYPE seed = now.usec(); + ACE_OS::srand(seed); + int delay; + + // We'll arbitrarily choose the first activated thread to be + // the controller. After it sleeps a few seconds, it will add + // five threads. + if( me == 1 ) + { + // Sleep from 1 to 10 seconds so that some of the other + // threads will be into their for() loop. + delay = ACE_OS::rand_r(seed)%10; + ACE_OS::sleep(abs(delay)+1); + + // Make ourselves the barrier owner so that we can change + // the number of threads. This should be done with care... + barrier_.owner( ACE_OS::thr_self() ); + + // Add 5 threads to the barrier and then activate() to + // make them real. Notice the third parameter to + // activate(). Without this parameter, the threads won't + // be created. + if( barrier_.threads(threads_+5) == 0 ) + { + this->activate(THR_NEW_LWP,5,1); + } + } + + // This for() loop represents an "infinite" work loop in an + // application. The theory is that the threads are dividing up + // some work but need to "recalibrate" if more threads are + // added. I'll just do five iterations so that the test + // doesn't run forever. + int i; + for( i = 0 ; i < 5 ; ++i ) + { + // The sleep() represents time doing work. + delay = ACE_OS::rand_r(seed)%7; + ACE_OS::sleep(abs(delay)+1); + + ACE_DEBUG(( LM_INFO, "(%P|%t|%T)\tThread %.2d of %.2d iteration %.2d\n", me, threads_, i )); + + // If the local threads_ variable doesn't match the number + // in the barrier, then the controller must have changed + // the thread count. We'll wait() for everyone and then + // recalibrate ourselves before continuing. + if( this->threads_ != barrier_.threads() ) + { + ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Waiting for thread count to increase to %d from %d\n", + barrier_.threads(), this->threads_ )); + + // Wait for all our sibling threads... + barrier_.wait(); + + // Set our local variable so that we don't come here again. + this->threads_ = barrier_.threads(); + + // Recalibration can be anything you want. At this + // point, we know that all of the threads are synch'd + // and ready to go. + } + } + + // Re-synch all of the threads before they exit. This isn't + // really necessary but I like to do it. + barrier_.done(); + + return(0); +} + +/* Our test application... + */ +int main(int, char**) +{ + // Create the test object with 5 threads + Test test(5); + + // and open it to test the barrier. + test.open(); + // Now wait for them all to exit. + test.wait(); + + return(0); +} diff --git a/docs/tutorials/017/combine.shar b/docs/tutorials/017/combine.shar index ff6f7ef7d2d..9595308932d 100644 --- a/docs/tutorials/017/combine.shar +++ b/docs/tutorials/017/combine.shar @@ -3,7 +3,7 @@ # To extract the files from this archive, save it to some FILE, remove # everything before the `!/bin/sh' line above, then type `sh FILE'. # -# Made on 1999-02-14 14:25 EST by . +# Made on 1999-02-24 16:56 EST by . # Source directory was `/var/home/jcej/projects/ACE_wrappers/docs/tutorials/017'. # # Existing files will *not* be overwritten unless `-c' is specified. @@ -12,12 +12,14 @@ # length mode name # ------ ---------- ------------------------------------------ # 422 -rw-rw-r-- hdr -# 45 -rw-rw-r-- bodies +# 64 -rw-rw-r-- bodies # 1397 -rw-rw-r-- page01.pre # 420 -rw-rw-r-- page02.pre # 739 -rw-rw-r-- page03.pre # 479 -rw-rw-r-- page04.pre # 375 -rw-rw-r-- page05.pre +# 373 -rw-rw-r-- page06.pre +# 216 -rw-rw-r-- page05.pst # save_IFS="${IFS}" IFS="${IFS}:" @@ -64,7 +66,7 @@ else fi rm -f 1231235999 $$.touch # -if mkdir _sh32003; then +if mkdir _sh29953; then $echo 'x -' 'creating lock directory' else $echo 'failed to create lock directory' @@ -116,20 +118,22 @@ PAGE=2 barrier.cpp Barrier_i.h Barrier_i.cpp +PAGE=6 +barrier2.cpp SHAR_EOF - $shar_touch -am 1110144198 'bodies' && + $shar_touch -am 0224165499 'bodies' && chmod 0664 'bodies' || $echo 'restore of' 'bodies' 'failed' if ( md5sum --help 2>&1 | grep 'sage: md5sum \[' ) >/dev/null 2>&1 \ && ( md5sum --version 2>&1 | grep -v 'textutils 1.12' ) >/dev/null; then md5sum -c << SHAR_EOF >/dev/null 2>&1 \ || $echo 'bodies:' 'MD5 check failed' -4924294a77d6ba78dcf667e92c341b4f bodies +b6fd04983b241794a9438df2ae77055c bodies SHAR_EOF else shar_count="`LC_ALL= LC_CTYPE= LANG= wc -c < 'bodies'`" - test 45 -eq "$shar_count" || - $echo 'bodies:' 'original size' '45,' 'current size' "$shar_count!" + test 64 -eq "$shar_count" || + $echo 'bodies:' 'original size' '64,' 'current size' "$shar_count!" fi fi # ============= page01.pre ============== @@ -316,5 +320,61 @@ SHAR_EOF $echo 'page05.pre:' 'original size' '375,' 'current size' "$shar_count!" fi fi -rm -fr _sh32003 +# ============= page06.pre ============== +if test -f 'page06.pre' && test "$first_param" != -c; then + $echo 'x -' SKIPPING 'page06.pre' '(file already exists)' +else + $echo 'x -' extracting 'page06.pre' '(text)' + sed 's/^X//' << 'SHAR_EOF' > 'page06.pre' && +I could have included this in the first Test object of the tutorial +but that may have complicated things a bit. What we're doing here is +recognizing when the "owner" thread adds more threads to the pool. +When we notice that, we use the barrier to wait until everything +stabilizes and then we recalibrate and move on. +

+The source is here. +


+SHAR_EOF + $shar_touch -am 0224165499 'page06.pre' && + chmod 0664 'page06.pre' || + $echo 'restore of' 'page06.pre' 'failed' + if ( md5sum --help 2>&1 | grep 'sage: md5sum \[' ) >/dev/null 2>&1 \ + && ( md5sum --version 2>&1 | grep -v 'textutils 1.12' ) >/dev/null; then + md5sum -c << SHAR_EOF >/dev/null 2>&1 \ + || $echo 'page06.pre:' 'MD5 check failed' +ad87cd9f57af4c9b2c91cb32b484c0d1 page06.pre +SHAR_EOF + else + shar_count="`LC_ALL= LC_CTYPE= LANG= wc -c < 'page06.pre'`" + test 373 -eq "$shar_count" || + $echo 'page06.pre:' 'original size' '373,' 'current size' "$shar_count!" + fi +fi +# ============= page05.pst ============== +if test -f 'page05.pst' && test "$first_param" != -c; then + $echo 'x -' SKIPPING 'page05.pst' '(file already exists)' +else + $echo 'x -' extracting 'page05.pst' '(text)' + sed 's/^X//' << 'SHAR_EOF' > 'page05.pst' && +
+Before we call it a wrap though, there's one more thing I want to show +you. Remember the comments around Barrier::threads()? On the next +page, I'll show you how to synch up when the number of threads changes. +SHAR_EOF + $shar_touch -am 0224165199 'page05.pst' && + chmod 0664 'page05.pst' || + $echo 'restore of' 'page05.pst' 'failed' + if ( md5sum --help 2>&1 | grep 'sage: md5sum \[' ) >/dev/null 2>&1 \ + && ( md5sum --version 2>&1 | grep -v 'textutils 1.12' ) >/dev/null; then + md5sum -c << SHAR_EOF >/dev/null 2>&1 \ + || $echo 'page05.pst:' 'MD5 check failed' +7ce8bbac0d211a3616b5052236e4983b page05.pst +SHAR_EOF + else + shar_count="`LC_ALL= LC_CTYPE= LANG= wc -c < 'page05.pst'`" + test 216 -eq "$shar_count" || + $echo 'page05.pst:' 'original size' '216,' 'current size' "$shar_count!" + fi +fi +rm -fr _sh29953 exit 0 diff --git a/docs/tutorials/017/page02.html b/docs/tutorials/017/page02.html index 5b7417bc4a9..b8762d9e392 100644 --- a/docs/tutorials/017/page02.html +++ b/docs/tutorials/017/page02.html @@ -35,30 +35,63 @@ class Test : public ACE_Task<ACE_NULL_SYNCH> { public: - // Open the object with a few threads - int open(int _threads); + // Construct the object with a desired thread count + Test(int _threads); + + // Open/begin the test. As usual, we have to match the + // ACE_Task signature. + int open(void * _unused = 0); + + // Change the threads_ value for the next invocation of open() + void threads(int _threads); + + // Get the current threads_ value. + int threads(void); // Perform the test int svc(void); protected: + // How many threads the barrier will test. + int threads_; + // The Barrier object we'll use in our tests below Barrier barrier_; }; +/* Construct the object & initialize the threads value for open() to + use. +*/ +Test::Test(int _threads) + : threads_(_threads) +{ +} + /* As usual, our open() will create one or more threads where we'll do the interesting work. */ -int Test::open( int _threads ) +int Test::open(void * _unused) { + ACE_UNUSED_ARG(_unused); + // One thing about the barrier: You have to tell it how many // threads it will be synching. The threads() mutator on my // Barrier class lets you do that and hides the implementation // details at the same time. - barrier_.threads(_threads); + barrier_.threads(threads_); // Activate the tasks as usual... - return this->activate(THR_NEW_LWP, _threads); + return this->activate(THR_NEW_LWP, threads_); +} + +void Test::threads(int _threads) +{ + threads_ = _threads; +} + +int Test::threads(void) +{ + return threads_; } /* svc() will execute in each thread & do a few things with the @@ -112,7 +145,7 @@ int Test::svc(void) // actually invokes wait() but before returning here, it will // clean up a few resources. The goal is to prevent carrying // around objects you don't need. - if( barrier_.wait() == -1 ) + if( barrier_.done() == -1 ) { ACE_DEBUG ((LM_INFO, "(%P|%t|%T)\tbarrier_.done() failed!\n")); return 0; @@ -137,16 +170,17 @@ int Test::svc(void) */ int main(int, char**) { - // Create the test object - Test test; + // Create the test object with 10 threads + Test test(10); - // and open it with 10 threads. - test.open(10); + // and open it to test the barrier. + test.open(); // Now wait for them all to exit. test.wait(); // Re-open the Test object with just 5 threads - test.open(5); + test.threads(5); + test.open(); // and wait for them to complete also. test.wait(); diff --git a/docs/tutorials/017/page03.html b/docs/tutorials/017/page03.html index 904fc58701f..6350fb1ca1f 100644 --- a/docs/tutorials/017/page03.html +++ b/docs/tutorials/017/page03.html @@ -34,10 +34,10 @@ the Barrier object almost as a "synchronization guard". #include "ace/Synch.h" /* Barrier is a simple wrapper for the ACE_Barrier synchronization - class. The ACE_Barrier is already pretty easy to use but I thought + class. The ACE_Barrier is already pretty easy to use but I thought I'd wrap it up to create just a bit more abstraction at the - application level. - */ + application level. */ + class Barrier { public: @@ -66,14 +66,23 @@ public: // done() will invoke wait(). Before returning though, it will // delete the barrier_ pointer below to reclaim some memory. int done (void); - + + // Reset the owning thread of the barrier. + void owner( ACE_thread_t _owner ); + protected: // The number of threads we're synching - ACE_Atomic_Op<ACE_Mutex,u_int> threads_; + ACE_Atomic_Op<ACE_Mutex, u_int> threads_; // The ACE_Barrier that does all of the work ACE_Barrier *barrier_; + // If we mutate the number of threads we have to do some black magic + // to make sure there isn't a memory leak. These two member + // variables are a part of that magic. + ACE_Barrier *new_barrier_; + ACE_Mutex barrier_mutex_; + // The thread which created the Barrier in the first place. Only // this thread can change the threads_ value. ACE_thread_t owner_; diff --git a/docs/tutorials/017/page04.html b/docs/tutorials/017/page04.html index 7032f4c2d3a..88ff60be565 100644 --- a/docs/tutorials/017/page04.html +++ b/docs/tutorials/017/page04.html @@ -33,6 +33,7 @@ locking that is just a bit more than what I wanted to present here. Barrier::Barrier(void) : threads_(0) ,barrier_(0) + ,new_barrier_(0) { owner_ = ACE_OS::thr_self(); } @@ -44,6 +45,11 @@ locking that is just a bit more than what I wanted to present here. delete barrier_; } +void Barrier::owner( ACE_thread_t _owner ) +{ + owner_ = _owner; +} + // Report on the number of threads. u_int Barrier::threads(void) { @@ -88,6 +94,25 @@ int Barrier::wait(void) return -1; } + // If the threads() mutator has been used, new_barrier_ will + // point to a new ACE_Barrier instance. We'll use a + // traditional double-check here to move that new object into + // place and cleanup the old one. + if( new_barrier_ ) + { + // mutex so that only one thread can do this part. + ACE_Guard<ACE_Mutex> mutex(barrier_mutex_); + + // We only want the first thread to plug in the new barrier... + if( new_barrier_ ) + { + // out with the old and in with the new. + delete barrier_; + barrier_ = new_barrier_; + new_barrier_ = 0; + } + } + return barrier_->wait(); } @@ -123,27 +148,31 @@ int Barrier::done(void) */ int Barrier::make_barrier( int _wait ) { - // Wait for and delete any existing barrier. + // Ensure we have a valid thread count. + if( ! threads_.value() ) + { + return -1; + } + + // If a barrier already exists, we'll arrange for it to be + // replaced through the wait() method above. if( barrier_ ) { + // Create the new barrier that wait() will install for us. + ACE_NEW_RETURN(new_barrier_,ACE_Barrier(threads_.value()),-1); + + // Wait for our siblings to synch before continuing if( _wait ) { barrier_->wait(); } - delete barrier_; } - - // Ensure we have a valid thread count. - if( ! threads_.value() ) + else { - return -1; + // Create the initial barrier. + ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); } - // Create the actual barrier. Note that we initialize it with - // threads_.value() to set its internal thread count. If the - // 'new' fails we will return -1 to the caller. - ACE_NEW_RETURN(barrier_,ACE_Barrier(threads_.value()),-1); - return 0; } diff --git a/docs/tutorials/017/page05.html b/docs/tutorials/017/page05.html index f0fd213c487..c3aeb61918f 100644 --- a/docs/tutorials/017/page05.html +++ b/docs/tutorials/017/page05.html @@ -22,5 +22,8 @@ enhancements will gladly be integrated into the Tutorial.
  • Barrier_i.h
  • Barrier_i.cpp -


    -
    [Tutorial Index]
    +
    +Before we call it a wrap though, there's one more thing I want to show +you. Remember the comments around Barrier::threads()? On the next +page, I'll show you how to synch up when the number of threads changes.


    +
    [Tutorial Index] [Continue This Tutorial]
    diff --git a/docs/tutorials/017/page06.html b/docs/tutorials/017/page06.html new file mode 100644 index 00000000000..1d80cfbd4d4 --- /dev/null +++ b/docs/tutorials/017/page06.html @@ -0,0 +1,203 @@ + + + + + ACE Tutorial 017 + + + +
    ACE Tutorial 017
    + +
    Using the ACE_Barrier synch object
    + +

    +


    +I could have included this in the first Test object of the tutorial +but that may have complicated things a bit. What we're doing here is +recognizing when the "owner" thread adds more threads to the pool. +When we notice that, we use the barrier to wait until everything +stabilizes and then we recalibrate and move on. +

    +The source is here. +


    +
    +// $Id$
    +
    +#include "Barrier_i.h"
    +#include "ace/Task.h"
    +
    +/* We'll use a simple Task<> derivative to test our new Barrier
    +   object.
    +*/
    +class Test : public ACE_Task<ACE_NULL_SYNCH>
    +{
    +public:
    +
    +        // Construct the object with a desired thread count
    +    Test(int _threads);
    +
    +        // Open/begin the test.  As usual, we have to match the
    +        // ACE_Task signature.
    +    int open(void * _unused = 0);
    +
    +        // Change the threads_ value for the next invocation of open()
    +    void threads(int _threads);
    +
    +        // Get the current threads_ value.
    +    int threads(void);
    +
    +        // Perform the test
    +    int svc(void);
    +
    +protected:
    +        // How many threads the barrier will test.
    +    u_int threads_;
    +
    +        // The Barrier object we'll use in our tests below
    +    Barrier barrier_;
    +
    +        // This lets us pick one (eg -- the first) thread as the
    +        // "controller" for our little test...
    +    ACE_Atomic_Op<ACE_Mutex,u_int> tcount_;
    +};
    +
    +/* Construct the object & initialize the threads value for open() to
    +   use.
    +*/
    +Test::Test(int _threads)
    +        : threads_(_threads), tcount_(0)
    +{
    +}
    +
    +/* As usual, our open() will create one or more threads where we'll do 
    +   the interesting work.
    +*/  
    +int Test::open(void * _unused)
    +{
    +    ACE_UNUSED_ARG(_unused);
    +
    +        // One thing about the barrier:  You have to tell it how many
    +        // threads it will be synching.  The threads() mutator on my
    +        // Barrier class lets you do that and hides the implementation 
    +        // details at the same time.
    +    barrier_.threads(threads_);
    +
    +        // Activate the tasks as usual...
    +    return this->activate(THR_NEW_LWP, threads_, 1);
    +}
    +
    +void Test::threads(int _threads)
    +{
    +    threads_ = _threads;
    +}
    +
    +int Test::threads(void)
    +{
    +    return threads_;
    +}
    +
    +/* svc() will execute in each thread & do a few things with the
    +   Barrier we have.
    + */
    +int Test::svc(void)
    +{
    +        // Say hello to everyone first.
    +    ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Created\n" ));
    +
    +        // Increment and save the "tcount" value.  We'll use it in
    +        // just a moment...
    +    int me = ++tcount_;
    +
    +        // Wait for all initial threads to get to this point before we
    +        // go any further.  This is standard barrier usage...
    +    barrier_.wait();
    +
    +        // Setup our random number generator.
    +    ACE_Time_Value now(ACE_OS::gettimeofday());
    +    ACE_RANDR_TYPE seed = now.usec();
    +    ACE_OS::srand(seed);
    +    int delay;
    +
    +        // We'll arbitrarily choose the first activated thread to be
    +        // the controller.  After it sleeps a few seconds, it will add 
    +        // five threads.
    +    if( me == 1 )
    +    {
    +            // Sleep from 1 to 10 seconds so that some of the other
    +            // threads will be into their for() loop.
    +        delay = ACE_OS::rand_r(seed)%10;
    +        ACE_OS::sleep(abs(delay)+1);
    +
    +            // Make ourselves the barrier owner so that we can change
    +            // the number of threads.  This should be done with care...
    +        barrier_.owner( ACE_OS::thr_self() );
    +
    +            // Add 5 threads to the barrier and then activate() to
    +            // make them real.  Notice the third parameter to
    +            // activate().  Without this parameter, the threads won't
    +            // be created.
    +        if( barrier_.threads(threads_+5) == 0 )
    +        {
    +            this->activate(THR_NEW_LWP,5,1);
    +        }
    +    }
    +
    +        // This for() loop represents an "infinite" work loop in an
    +        // application. The theory is that the threads are dividing up 
    +        // some work but need to "recalibrate" if more threads are
    +        // added.  I'll just do five iterations so that the test
    +        // doesn't run forever.
    +    int i;
    +    for( i = 0 ; i < 5 ; ++i )
    +    {
    +            // The sleep() represents time doing work.
    +        delay = ACE_OS::rand_r(seed)%7;
    +        ACE_OS::sleep(abs(delay)+1);
    +
    +        ACE_DEBUG(( LM_INFO, "(%P|%t|%T)\tThread %.2d of %.2d iteration %.2d\n", me, threads_, i ));
    + 
    +            // If the local threads_ variable doesn't match the number 
    +            // in the barrier, then the controller must have changed
    +            // the thread count.  We'll wait() for everyone and then
    +            // recalibrate ourselves before continuing.
    +        if( this->threads_ != barrier_.threads() )
    +        {
    +            ACE_DEBUG(( LM_INFO, "(%P|%t|%T) Waiting for thread count to increase to %d from %d\n",
    +                        barrier_.threads(), this->threads_ ));
    +
    +                // Wait for all our sibling threads...
    +            barrier_.wait();
    +
    +                // Set our local variable so that we don't come here again.
    +            this->threads_ = barrier_.threads();
    +
    +                // Recalibration can be anything you want.  At this
    +                // point, we know that all of the threads are synch'd
    +                // and ready to go.
    +        }
    +    }
    +
    +        // Re-synch all of the threads before they exit.  This isn't
    +        // really necessary but I like to do it.
    +    barrier_.done();
    +
    +    return(0);
    +}
    +
    +/* Our test application...
    + */
    +int main(int, char**)
    +{
    +        // Create the test object with 5 threads
    +    Test test(5);
    +
    +        // and open it to test the barrier.
    +    test.open();
    +        // Now wait for them all to exit.
    +    test.wait();
    +
    +    return(0);
    +}
    +
    +


    +
    [Tutorial Index]
    -- cgit v1.2.1