diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-04 00:06:38 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-04 00:06:38 +0000 |
commit | 70d108545611dbb86049d0109ef4a7ab1ef6289e (patch) | |
tree | 790c9b07d5eac35a82ae7d9f5e7b59a6243a4b2c /tests/Thread_Pool_Test.cpp | |
parent | 1c44106287219a05ddbff09df4574b90777040ae (diff) | |
download | ATCD-70d108545611dbb86049d0109ef4a7ab1ef6289e.tar.gz |
foo
Diffstat (limited to 'tests/Thread_Pool_Test.cpp')
-rw-r--r-- | tests/Thread_Pool_Test.cpp | 171 |
1 files changed, 101 insertions, 70 deletions
diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp index 2363850e7f3..ee4ce7c1ffa 100644 --- a/tests/Thread_Pool_Test.cpp +++ b/tests/Thread_Pool_Test.cpp @@ -10,20 +10,21 @@ // // = DESCRIPTION // This test program illustrates how the ACE task synchronization -// mechanisms work in conjunction with the ACE_Task and the -// ACE_Thread_Manager. If the manual flag is not set input comes -// from stdin until the user enters a return only. This stops -// all workers via a message block of length 0. This is an -// alternative shutdown of workers compared to queue deactivate. +// mechanisms and ACE_Message_Block reference counting works in +// conjunction with the ACE_Task and the ACE_Thread_Manager. If +// the manual flag is not set input comes from stdin until the +// user enters a return. This stops all workers via a message +// block of length 0. This shows an alternative way to shutdown +// worker tasks compared to queue deactivate. // // = AUTHOR // Karlheinz Dorn, Doug Schmidt, and Prashant Jain // // ============================================================================ +#define protected public #include "ace/Task.h" #include "ace/Service_Config.h" - #include "ace/Task.h" #include "test_config.h" @@ -35,22 +36,35 @@ static size_t n_iterations = 100; class Thread_Pool : public ACE_Task<ACE_MT_SYNCH> { public: - Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads); + Thread_Pool (int n_threads); + // Create the thread pool containing <n_threads>. + ~Thread_Pool (void); + + virtual int open (void * = 0); + // Produce the messages that are consumed by the threads in the + // thread pool. + virtual int svc (void); // Iterate <n_iterations> time printing off a message and "waiting" // for all other threads to complete this iteration. virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); - // This allows the producer to pass messages to the <Thread_Pool>. + // Allows the producer to pass messages to the <Thread_Pool>. private: virtual int close (u_long); + // Close hook. - // = Not needed for this test. - virtual int open (void *) { return 0; } + ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_; + // Serialize access to <ACE_Message_Block> reference count, which + // will be decremented from multiple threads. }; +Thread_Pool::~Thread_Pool (void) +{ +} + int Thread_Pool::close (u_long) { @@ -58,9 +72,7 @@ Thread_Pool::close (u_long) return 0; } -Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr, - int n_threads) - : ACE_Task<ACE_MT_SYNCH> (thr_mgr) +Thread_Pool::Thread_Pool (int n_threads) { // Create worker threads. if (this->activate (THR_NEW_LWP, n_threads) == -1) @@ -75,42 +87,49 @@ Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv) return this->putq (mb, tv); } -// Iterate <n_iterations> time printing off a message and "waiting" -// for all other threads to complete this iteration. +// Iterate <n_iterations> printing off a message and "waiting" for all +// other threads to complete this iteration. int Thread_Pool::svc (void) { ACE_NEW_THREAD; - // Note that the ACE_Task::svc_run () method automatically adds us to - // the Thread_Manager when the thread begins. - - int count = 1; + // The <ACE_Task::svc_run()> method automatically adds us to the + // <ACE_Service_Config>'s <ACE_Thread_Manager> when the thread + // begins. // Keep looping, reading a message out of the queue, until we get a // message with a length == 0, which signals us to quit. - for (;; count++) + for (int count = 1; ; count++) { ACE_Message_Block *mb; + ACE_DEBUG ((LM_DEBUG, "(%t) **** before head = %d, tail = %d\n", + this->msg_queue ()->head_, + this->msg_queue ()->tail_)); ACE_ASSERT (this->getq (mb) != -1); + ACE_DEBUG ((LM_DEBUG, "(%t) ++++ after head = %d, tail = %d\n", + this->msg_queue ()->head_, + this->msg_queue ()->tail_)); + int length = mb->length (); if (length > 0) ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d, length = %d, text = \"%*s\"\n", - count, length, length - 1, mb->rd_ptr ())); + "(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n", + count, this->msg_queue ()->message_count (), + length, length - 1, mb->rd_ptr ())); // We're responsible for deallocating this. - delete mb; + mb->release (); if (length == 0) { ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d, got NULL message, exiting\n", - count)); + "(%t) in iteration %d, queue len = %d, got NULL message, exiting\n", + count, this->msg_queue ()->message_count ())); break; } } @@ -120,17 +139,22 @@ Thread_Pool::svc (void) return 0; } -static void -produce (Thread_Pool &thread_pool) +int +Thread_Pool::open (void *) { - ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n")); - thread_pool.dump (); + ACE_DEBUG ((LM_DEBUG, + "(%t) producer start, dumping the Thread_Pool\n")); + this->dump (); + + ACE_Message_Block *mb; for (int n;;) { // Allocate a new message. - ACE_Message_Block *mb; - ACE_NEW (mb, ACE_Message_Block (BUFSIZ)); + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ, ACE_Message_Block::MB_DATA, + 0, 0, 0, &this->lock_adapter_), + -1); #if defined (manual) ACE_DEBUG ((LM_DEBUG, @@ -140,50 +164,53 @@ produce (Thread_Pool &thread_pool) static size_t count = 0; ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count); - n = ACE_OS::strlen (mb->rd_ptr ()); - if (count == n_iterations) - n = 1; // Indicate that we need to shut down. + if (count == n_iterations || n <= 1) + break; else count++; if (count == 0 || (count % 20 == 0)) ACE_OS::sleep (1); #endif /* manual */ - if (n > 1) - { - // Send a normal message to the waiting threads and continue - // producing. - mb->wr_ptr (n); - - // Pass the message to the Thread_Pool. - if (thread_pool.put (mb) == -1) - ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); - } - else - { - // Send a shutdown message to the waiting threads and exit. - ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n")); - thread_pool.dump (); - - for (int i = thread_pool.thr_count (); i > 0; i--) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) EOF, enqueueing NULL block for thread = %d\n", - i)); - - // Enqueue a NULL message to flag each consumer to - // shutdown. - if (thread_pool.put (new ACE_Message_Block) == -1) - ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); - } - - ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n")); - thread_pool.dump (); - break; - } + // Send a normal message to the waiting threads and continue + // producing. + mb->wr_ptr (n); + + // Pass the message to the Thread_Pool. + if (this->put (mb) == -1) + ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); } + + // Send a shutdown message to the waiting threads and exit. + ACE_DEBUG ((LM_DEBUG, + "\n(%t) sending shutdown message to %d threads, dump of task:\n", + this->thr_count ())); + this->dump (); + + ACE_NEW_RETURN (mb, + ACE_Message_Block (0, ACE_Message_Block::MB_DATA, + 0, 0, 0, &this->lock_adapter_), + -1); + + for (int i = this->thr_count (); i > 0; i--) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) EOF, enqueueing NULL block for thread = %d\n", + i)); + + // Enqueue an empty message to flag each consumer to shutdown. + // Note that we use reference counting to avoid having to copy + // the message. + if (this->put (mb->duplicate ()) == -1) + ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); + } + + mb->release (); + + ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n")); + this->dump (); } #endif /* ACE_HAS_THREADS */ @@ -198,17 +225,21 @@ main (int, char *[]) ACE_DEBUG ((LM_DEBUG, "(%t) threads = %d\n", n_threads)); // Create the worker tasks. - Thread_Pool thread_pool (ACE_Service_Config::thr_mgr (), - n_threads); + Thread_Pool thread_pool (n_threads); // Create work for the worker tasks to process in their own threads. - produce (thread_pool); + thread_pool.open (); // Wait for all the threads to reach their exit point. - ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) waiting for worker tasks to finish...\n")); + ACE_Service_Config::thr_mgr ()->wait (); + ACE_ASSERT (thread_pool.msg_queue ()->is_empty ()); + ACE_DEBUG ((LM_DEBUG, "(%t) head = %d, tail = %d\n", + thread_pool.msg_queue ()->head_, + thread_pool.msg_queue ()->tail_)); ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n")); #else ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); |