diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-07-17 22:01:36 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-07-17 22:01:36 +0000 |
commit | fff0c99afb5df8dd3d742ad901c366658c6037a0 (patch) | |
tree | 1d50392a87581862fd903001844132830291092a /examples/Threads | |
parent | 9a36bc143fc14cc03f14d4e83d829288588b55be (diff) | |
download | ATCD-fff0c99afb5df8dd3d742ad901c366658c6037a0.tar.gz |
*** empty log message ***
Diffstat (limited to 'examples/Threads')
-rw-r--r-- | examples/Threads/barrier2.cpp | 177 |
1 files changed, 108 insertions, 69 deletions
diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp index 940988cf208..ca3a6890f8e 100644 --- a/examples/Threads/barrier2.cpp +++ b/examples/Threads/barrier2.cpp @@ -1,17 +1,14 @@ // $Id$ -// generic_worker_task.cpp -// // This test program illustrates how the ACE task workers/barrier // synchronization mechanisms work in conjunction with the ACE_Task -// and the ACE_Thread_Manager. The manual flag not set simulates -// user input, if set input comes from stdin until RETURN only is -// entered which stops all workers via a message block of length -// 0. This is an alernative shutdown of workers compared to queue -// deactivate. The delay_put flag simulates a delay between the -// shutdown puts. All should work with this flag disabled! The -// BARRIER_TYPE is supposed to enable/disable barrier sync on each svc -// a worker has done. +// and the ACE_Thread_Manager. The manual flag not set simulates user +// input, if set input comes from stdin until RETURN only is entered +// which stops all workers via a message block of length 0. This is an +// alernative shutdown of workers compared to queue deactivate. The +// delay_put flag simulates a delay between the shutdown puts. All +// should work with this flag disabled! The BARRIER_TYPE is supposed +// to enable/disable barrier sync on each svc a worker has done. #include "ace/Task.h" #include "ace/Service_Config.h" @@ -19,20 +16,16 @@ #if defined (ACE_HAS_THREADS) #define BARRIER_TYPE ACE_Null_Barrier -//#define BARRIER_TYPE ACE_Barrier -//#ifdef delay_put -//#define manual template <class BARRIER> class Worker_Task : public ACE_Task<ACE_MT_SYNCH> { public: - Worker_Task (ACE_Thread_Manager *thr_mgr, int n_threads, int inp_serialize = 1); - virtual int Producer (void); + virtual int producer (void); // produce input for workers virtual int input (ACE_Message_Block *mb); @@ -54,10 +47,16 @@ private: // = Not needed for this test. virtual int open (void *) { return 0; } - virtual int close (u_long) {ACE_DEBUG ((LM_DEBUG,"(%t) in close of worker\n")); return 0; } + virtual int close (u_long) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) in close of worker\n")); + return 0; + } int nt_; // Number of worker threads to run. + int inp_serialize_; BARRIER barrier_; @@ -71,6 +70,7 @@ Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr, barrier_ (n_threads) { nt_ = n_threads; + // Create worker threads. inp_serialize_ = inp_serialize; @@ -79,15 +79,19 @@ Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr, if (nt_ > 0 && inp_serialize == 1) if (this->activate (THR_NEW_LWP, n_threads) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); + ACE_ERROR ((LM_ERROR, + "%p\n", + "activate failed")); } // Simply enqueue the Message_Block into the end of the queue. template <class BARRIER> int -Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +Worker_Task<BARRIER>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { int result; + if (this->inp_serialize_) result = this->putq (mb, tv); else @@ -96,7 +100,8 @@ Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) result = this->service (mb, iter++); if (this->output (mb) < 0) - ACE_DEBUG ((LM_DEBUG, "(%t) output not connected!\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) output not connected!\n")); mb->release (); } @@ -104,15 +109,22 @@ Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) } template <class BARRIER> int -Worker_Task<BARRIER>::service (ACE_Message_Block *mb, int iter) +Worker_Task<BARRIER>::service (ACE_Message_Block *mb, + int iter) { int length = mb->length (); if (length > 0) { - ACE_DEBUG ((LM_DEBUG,"(%t) in iteration %d len=%d text got:\n",iter,length)); - ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); - ACE_DEBUG ((LM_DEBUG,"\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) in iteration %d len=%d text got:\n", + iter, + length)); + ACE_OS::write (ACE_STDOUT, + mb->rd_ptr (), + length); + ACE_DEBUG ((LM_DEBUG, + "\n")); } return 0; } @@ -123,8 +135,8 @@ Worker_Task<BARRIER>::service (ACE_Message_Block *mb, int iter) template <class BARRIER> int Worker_Task<BARRIER>::svc (void) { - // Note that the ACE_Task::svc_run () method automatically adds us - // to the Thread_Manager when the thread begins. + // Note that the <ACE_Task::svc_run> method automatically adds us to + // the Thread_Manager when the thread begins. // Keep looping, reading a message out of the queue, until we get a // message with a length == 0, which signals us to quit. @@ -138,7 +150,9 @@ Worker_Task<BARRIER>::svc (void) if (result == -1) { ACE_ERROR ((LM_ERROR, - "(%t) in iteration %d\n", "error waiting for message in iteration", iter)); + "(%t) in iteration %d\n", + "error waiting for message in iteration", + iter)); break; } @@ -147,7 +161,9 @@ Worker_Task<BARRIER>::svc (void) if (length == 0) { - ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d got quit, exit!\n", iter)); + ACE_DEBUG ((LM_DEBUG, + "(%t) in iteration %d got quit, exit!\n", + iter)); mb->release (); break; } @@ -158,79 +174,103 @@ Worker_Task<BARRIER>::svc (void) mb->release (); } - // Note that the ACE_Task::svc_run () method automatically removes - // us from the Thread_Manager when the thread exits. + // Note that the <ACE_Task::svc_run> method automatically removes us + // from the Thread_Manager when the thread exits. return 0; } template <class BARRIER> int -Worker_Task<BARRIER>::Producer (void) +Worker_Task<BARRIER>::producer (void) { // Keep reading stdin, until we reach EOF. for (;;) { // Allocate a new message. - ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + ACE_Message_Block *mb; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ), + -1); if (this->input (mb) == -1) return -1; } - ACE_NOTREACHED(return 0); + ACE_NOTREACHED (return 0); } -template <class BARRIER>int +template <class BARRIER> int Worker_Task<BARRIER>::output (ACE_Message_Block *mb) { return this->put_next (mb); } -template <class BARRIER>int +template <class BARRIER> int Worker_Task<BARRIER>::input (ACE_Message_Block *mb) { ACE_Message_Block *mb1; -#ifndef manual - static int l= 0; - char str[]="kalle"; - strcpy (mb->rd_ptr (),str); - int n=strlen (str); - if (l==1000) - n=1; +#if !defined (manual) + static int l = 0; + char str[] = "kalle"; + ACE_OS::strcpy (mb->rd_ptr (), str); + + int n = ACE_OS::strlen (str); + + if (l == 1000) + n = 1; l++; - if (l==0 || (l%100 == 0)) ACE_OS::sleep (5); + + if (l == 0 || (l % 100 == 0)) + ACE_OS::sleep (5); if (n <= 1) #else - ACE_DEBUG ((LM_DEBUG,"(%t) press chars and enter to put a new message into task queue ...\n")); - if ((n = read (0, mb->rd_ptr (), mb->size ())) <= 1) -#endif // manual + ACE_DEBUG ((LM_DEBUG, + "(%t) press chars and enter to put a new message into task queue ...\n")); + n = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + if (n <= 1) +#endif /* manual */ { // Send a shutdown message to the waiting threads and exit. // cout << "\nvor loop, dump of task msg queue:\n" << endl; // this->msg_queue ()->dump (); - for (int i=0;i<nt_;i++) + + for (int i = 0; i < nt_; i++) { - ACE_DEBUG ((LM_DEBUG,"(%t) eof, sending block for thread=%d\n",i+1)); - mb1 = new ACE_Message_Block (2); + ACE_DEBUG ((LM_DEBUG, + "(%t) eof, sending block for thread=%d\n", + i + 1)); + + ACE_NEW_RETURN (mb1, + ACE_Message_Block (2), + -1); mb1->length (0); + if (this->put (mb1) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put")); -#ifdef delay_put - ACE_OS::sleep (1); // this sleep helps to shutdown correctly -> was an error! + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "put")); +#if defined (delay_put) + // this sleep helps to shutdown correctly -> was an error! + ACE_OS::sleep (1); #endif /* delay_put */ } - // cout << "\nnach loop, dump of task msg queue:\n" << endl; - // this->msg_queue ()->dump (); - return (-1); + return -1; } else { - // Send a normal message to the waiting threads and continue producing. + // Send a normal message to the waiting threads and continue + // producing. mb->wr_ptr (n); + if (this->put (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put")); + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "put")); } return 0; } @@ -240,22 +280,23 @@ main (int argc, char *argv[]) { int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; - ACE_DEBUG ((LM_DEBUG,"(%t) worker threads running=%d\n",n_threads)); + ACE_DEBUG ((LM_DEBUG, + "(%t) worker threads running=%d\n", + n_threads)); - - Worker_Task<BARRIER_TYPE> *worker_task = - new Worker_Task<BARRIER_TYPE> (ACE_Thread_Manager::instance (), - /*n_threads*/ 0,0); - - worker_task->Producer (); + Worker_Task<BARRIER_TYPE> worker_task (ACE_Thread_Manager::instance (), + /* n_threads */ 0, + 0); + worker_task->producer (); // Wait for all the threads to reach their exit point. - ACE_DEBUG ((LM_DEBUG,"(%t) waiting with thread manager ...\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) waiting with thread manager ...\n")); + ACE_Thread_Manager::instance ()->wait (); - ACE_DEBUG ((LM_DEBUG,"(%t) delete worker task ...\n")); - delete worker_task; - ACE_DEBUG ((LM_DEBUG,"(%t) done correct!\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) done correct!\n")); return 0; } @@ -264,8 +305,6 @@ template class Worker_Task<ACE_Null_Barrier>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate Worker_Task<ACE_Null_Barrier> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - - #else int main (int, char *[]) |