diff options
Diffstat (limited to 'examples/Threads/barrier2.cpp')
-rw-r--r-- | examples/Threads/barrier2.cpp | 317 |
1 files changed, 0 insertions, 317 deletions
diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp deleted file mode 100644 index c621190ad0b..00000000000 --- a/examples/Threads/barrier2.cpp +++ /dev/null @@ -1,317 +0,0 @@ -// $Id$ - -// This test program illustrates how the ACE task workers/barrier -// synchronization mechanisms work in conjunction with the ACE_Task -// and the ACE_Thread_Manager. The manual flag not set simulates user -// input, if set input comes from stdin until RETURN only is entered -// which stops all workers via a message block of length 0. This is an -// alernative shutdown of workers compared to queue deactivate. The -// delay_put flag simulates a delay between the shutdown puts. All -// should work with this flag disabled! The BARRIER_TYPE is supposed -// to enable/disable barrier sync on each svc a worker has done. - -#include "ace/Task.h" -#include "ace/Service_Config.h" - -ACE_RCSID(Threads, barrier2, "$Id$") - -#if defined (ACE_HAS_THREADS) - -#define BARRIER_TYPE ACE_Null_Barrier - -template <class BARRIER> -class Worker_Task : public ACE_Task<ACE_MT_SYNCH> -{ -public: - Worker_Task (ACE_Thread_Manager *thr_mgr, - int n_threads, - int inp_serialize = 1); - - virtual int producer (void); - // produce input for workers - - virtual int input (ACE_Message_Block *mb); - // Fill one message block via a certain input strategy. - - virtual int output (ACE_Message_Block *mb); - // Forward one message block via a certain output strategy to the - // next task if any. - - virtual int service (ACE_Message_Block *mb, int iter); - // Perform one message block dependant service. - -private: - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0); - - virtual int svc (void); - // Iterate <n_iterations> time printing off a message and "waiting" - // for all other threads to complete this iteration. - - // = Not needed for this test. - virtual int open (void *) { return 0; } - virtual int close (u_long) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) in close of worker\n")); - return 0; - } - - int nt_; - // Number of worker threads to run. - - int inp_serialize_; - - BARRIER barrier_; -}; - -template <class BARRIER> -Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr, - int n_threads, - int inp_serialize) - : ACE_Task<ACE_MT_SYNCH> (thr_mgr), - barrier_ (n_threads) -{ - nt_ = n_threads; - - // Create worker threads. - inp_serialize_ = inp_serialize; - - // Use the task's message queue for serialization (default) or run - // service in the context of the caller thread. - - if (nt_ > 0 && inp_serialize == 1) - if (this->activate (THR_NEW_LWP, n_threads) == -1) - ACE_ERROR ((LM_ERROR, - "%p\n", - "activate failed")); -} - -// Simply enqueue the Message_Block into the end of the queue. - -template <class BARRIER> int -Worker_Task<BARRIER>::put (ACE_Message_Block *mb, - ACE_Time_Value *tv) -{ - int result; - - if (this->inp_serialize_) - result = this->putq (mb, tv); - else - { - static int iter = 0; - result = this->service (mb, iter++); - - if (this->output (mb) < 0) - ACE_DEBUG ((LM_DEBUG, - "(%t) output not connected!\n")); - - mb->release (); - } - return result; -} - -template <class BARRIER> int -Worker_Task<BARRIER>::service (ACE_Message_Block *mb, - int iter) -{ - int length = mb->length (); - - if (length > 0) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d len=%d text got:\n", - iter, - length)); - ACE_OS::write (ACE_STDOUT, - mb->rd_ptr (), - length); - ACE_DEBUG ((LM_DEBUG, - "\n")); - } - return 0; -} - -// Iterate <n_iterations> time printing off a message and "waiting" -// for all other threads to complete this iteration. - -template <class BARRIER> int -Worker_Task<BARRIER>::svc (void) -{ - // Note that the <ACE_Task::svc_run> method automatically adds us to - // the Thread_Manager when the thread begins. - - // Keep looping, reading a message out of the queue, until we get a - // message with a length == 0, which signals us to quit. - - for (int iter = 1; ;iter++) - { - ACE_Message_Block *mb = 0; - - int result = this->getq (mb); - - if (result == -1) - { - ACE_ERROR ((LM_ERROR, - "(%t) in iteration %d\n", - "error waiting for message in iteration", - iter)); - break; - } - - int length = mb->length (); - this->service (mb,iter); - - if (length == 0) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d got quit, exit!\n", - iter)); - mb->release (); - break; - } - - this->barrier_.wait (); - this->output (mb); - - mb->release (); - } - - // Note that the <ACE_Task::svc_run> method automatically removes us - // from the Thread_Manager when the thread exits. - - return 0; -} - -template <class BARRIER> int -Worker_Task<BARRIER>::producer (void) -{ - // Keep reading stdin, until we reach EOF. - - for (;;) - { - // Allocate a new message. - ACE_Message_Block *mb; - - ACE_NEW_RETURN (mb, - ACE_Message_Block (BUFSIZ), - -1); - - if (this->input (mb) == -1) - return -1; - } - - ACE_NOTREACHED (return 0); -} - -template <class BARRIER> int -Worker_Task<BARRIER>::output (ACE_Message_Block *mb) -{ - return this->put_next (mb); -} - -template <class BARRIER> int -Worker_Task<BARRIER>::input (ACE_Message_Block *mb) -{ - ACE_Message_Block *mb1; - -#if !defined (manual) - static int l = 0; - char str[] = "kalle"; - ACE_OS::strcpy (mb->rd_ptr (), str); - - int n = ACE_OS::strlen (str); - - if (l == 1000) - n = 1; - l++; - - if (l == 0 || (l % 100 == 0)) - ACE_OS::sleep (5); - if (n <= 1) -#else - ACE_DEBUG ((LM_DEBUG, - "(%t) press chars and enter to put a new message into task queue ...\n")); - n = ACE_OS::read (ACE_STDIN, - mb->rd_ptr (), - mb->size ()); - if (n <= 1) -#endif /* manual */ - { - // Send a shutdown message to the waiting threads and exit. - // cout << "\nvor loop, dump of task msg queue:\n" << endl; - // this->msg_queue ()->dump (); - - for (int i = 0; i < nt_; i++) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) eof, sending block for thread=%d\n", - i + 1)); - - ACE_NEW_RETURN (mb1, - ACE_Message_Block (2), - -1); - mb1->length (0); - - if (this->put (mb1) == -1) - ACE_ERROR ((LM_ERROR, - "(%t) %p\n", - "put")); -#if defined (delay_put) - // this sleep helps to shutdown correctly -> was an error! - ACE_OS::sleep (1); -#endif /* delay_put */ - } - return -1; - } - else - { - // Send a normal message to the waiting threads and continue - // producing. - mb->wr_ptr (n); - - if (this->put (mb) == -1) - ACE_ERROR ((LM_ERROR, - "(%t) %p\n", - "put")); - } - return 0; -} - -int -main (int argc, char *argv[]) -{ - int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; - - ACE_DEBUG ((LM_DEBUG, - "(%t) worker threads running=%d\n", - n_threads)); - - Worker_Task<BARRIER_TYPE> worker_task (ACE_Thread_Manager::instance (), - /* n_threads */ 0, - 0); - worker_task.producer (); - - // Wait for all the threads to reach their exit point. - ACE_DEBUG ((LM_DEBUG, - "(%t) waiting with thread manager ...\n")); - - ACE_Thread_Manager::instance ()->wait (); - - ACE_DEBUG ((LM_DEBUG, - "(%t) done correct!\n")); - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class Worker_Task<ACE_Null_Barrier>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate Worker_Task<ACE_Null_Barrier> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ -#else -int -main (int, char *[]) -{ - ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); - return 0; -} -#endif /* ACE_HAS_THREADS */ |