diff options
Diffstat (limited to 'examples/Threads/thread_pool.cpp')
-rw-r--r-- | examples/Threads/thread_pool.cpp | 268 |
1 files changed, 0 insertions, 268 deletions
diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp deleted file mode 100644 index e5ad4c5aaeb..00000000000 --- a/examples/Threads/thread_pool.cpp +++ /dev/null @@ -1,268 +0,0 @@ -// $Id$ - -// This test program illustrates how the <ACE_Task> synchronization -// mechanisms work in conjunction with the <ACE_Thread_Manager>. If -// the <manual> flag is set input comes from stdin until the user -// enters a return -- otherwise, the input is generated automatically. -// All worker threads shutdown when they receive a message block of -// length 0. -// -// This code is original based on a test program written by Karlheinz -// Dorn <Karlheinz.Dorn@med.siemens.de>. It was modified to utilize -// more ACE features by Doug Schmidt <schmidt@cs.wustl.edu>. - -#include "ace/OS_NS_stdio.h" -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_unistd.h" -#include "ace/OS_main.h" -#include "ace/Task.h" -#include "ace/Service_Config.h" - -ACE_RCSID(Threads, thread_pool, "$Id$") - -#if defined (ACE_HAS_THREADS) - -// Default number of iterations to run the test. -static int n_iterations = 100; - -// Controls whether the input is generated "manually" or automatically. -static int manual = 0; - -class Thread_Pool : public ACE_Task<ACE_MT_SYNCH> -{ - // = TITLE - // Defines a thread pool abstraction based on the <ACE_Task>. -public: - Thread_Pool (ACE_Thread_Manager *thr_mgr, - int n_threads); - // Constructor activates <n_threads> in the thread pool. - - ~Thread_Pool (void); - // Destructor... - - virtual int svc (void); - // Iterate <n_iterations> time printing off a message and "waiting" - // for all other threads to complete this iteration. - - virtual int put (ACE_Message_Block *mb, - ACE_Time_Value *tv = 0); - // This allows the producer to pass messages to the <Thread_Pool>. - -private: - virtual int close (u_long); - // Close hook. -}; - -int -Thread_Pool::close (u_long) -{ - ACE_DEBUG ((LM_DEBUG, - "(%t) worker thread closing down\n")); - return 0; -} - -Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr, - int n_threads) - : ACE_Task<ACE_MT_SYNCH> (thr_mgr) -{ - // Create the pool of worker threads. - if (this->activate (THR_NEW_LWP, - n_threads) == -1) - ACE_ERROR ((LM_ERROR, - "%p\n", - "activate failed")); -} - -Thread_Pool::~Thread_Pool (void) -{ -} - -// Simply enqueue the Message_Block into the end of the queue. - -int -Thread_Pool::put (ACE_Message_Block *mb, - ACE_Time_Value *tv) -{ - return this->putq (mb, tv); -} - -// Iterate <n_iterations> time printing off a message and "waiting" -// for all other threads to complete this iteration. - -int -Thread_Pool::svc (void) -{ - // Note that the <ACE_Task::svc_run> method automatically adds us to - // the Thread_Manager when the thread begins. - - int count = 1; - - // Keep looping, reading a message out of the queue, until we get a - // message with a length == 0, which signals us to quit. - - for (;; count++) - { - ACE_Message_Block *mb; - - ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d before getq ()\n", - count)); - - if (this->getq (mb) == -1) - { - ACE_ERROR ((LM_ERROR, - "(%t) in iteration %d, got result -1, exiting\n", - count)); - break; - } - - int length = mb->length (); - - if (length > 0) - ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d, length = %d, text = \"%*s\"\n", - count, - length, - length - 1, - mb->rd_ptr ())); - - // We're responsible for deallocating this. - mb->release (); - - if (length == 0) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d, got NULL message, exiting\n", - count)); - break; - } - } - - // Note that the <ACE_Task::svc_run> method automatically removes us - // from the <ACE_Thread_Manager> when the thread exits. - return 0; -} - -static void -producer (Thread_Pool &thread_pool) -{ - ACE_DEBUG ((LM_DEBUG, - "(%t) producer start, generating data for the <Thread_Pool>\n")); - // thread_pool.dump (); - - for (int n; ;) - { - // Allocate a new message. - ACE_Message_Block *mb; - ACE_NEW (mb, - ACE_Message_Block (BUFSIZ)); - - if (manual) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) enter a new message for the task pool...")); - n = ACE_OS::read (ACE_STDIN, - mb->rd_ptr (), - mb->size ()); - } - else - { - static int count = 0; - - ACE_OS::sprintf (mb->rd_ptr (), - "%d\n", - count); - n = ACE_OS::strlen (mb->rd_ptr ()); - - if (count == n_iterations) - n = 1; // Indicate that we need to shut down. - else - count++; - - if (count == 0 || (count % 20 == 0)) - ACE_OS::sleep (1); - } - - if (n > 1) - { - // Send a normal message to the waiting threads and continue - // producing. - mb->wr_ptr (n); - - // Pass the message to the Thread_Pool. - if (thread_pool.put (mb) == -1) - ACE_ERROR ((LM_ERROR, - " (%t) %p\n", - "put")); - } - else - { - ACE_DEBUG ((LM_DEBUG, - "\n(%t) start loop, dump of task:\n")); - // thread_pool.dump (); - - // Send a shutdown message to the waiting threads and exit. - for (int i = thread_pool.thr_count (); i > 0; i--) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) EOF, enqueueing NULL block for thread = %d\n", - i)); - - // Enqueue a NULL message to flag each consumer to - // shutdown. - ACE_Message_Block *mb; - ACE_NEW (mb, - ACE_Message_Block); - if (thread_pool.put (mb) == -1) - ACE_ERROR ((LM_ERROR, - " (%t) %p\n", - "put")); - } - - ACE_DEBUG ((LM_DEBUG, - "\n(%t) end loop\n")); - // thread_pool.dump (); - break; - } - } -} - -int -ACE_TMAIN (int argc, ACE_TCHAR *argv[]) -{ - int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; - n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : n_iterations; - manual = argc > 3 ? 1 : 0; - - ACE_DEBUG ((LM_DEBUG, - "(%t) argc = %d, threads = %d\n", - argc, - n_threads)); - - // Create the worker tasks. - Thread_Pool thread_pool (ACE_Thread_Manager::instance (), - n_threads); - - // Create work for the worker tasks to process in their own threads. - producer (thread_pool); - - ACE_DEBUG ((LM_DEBUG, - "(%t) waiting for threads to exit in Thread_Pool destructor...\n")); - // Wait for all the threads to reach their exit point. - if (thread_pool.wait () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) wait() failed\n"), - 1); - - ACE_DEBUG ((LM_DEBUG, - "(%t) destroying worker tasks and exiting...\n")); - return 0; -} -#else -int -ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_ERROR ((LM_ERROR, - "threads not supported on this platform\n")); - return 0; -} -#endif /* ACE_HAS_THREADS */ |