diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2000-01-18 03:18:18 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2000-01-18 03:18:18 +0000 |
commit | 948d6bfec7f0b11d5ea06b8c71bc7f076832fdb3 (patch) | |
tree | 8f082fc22f230edfd8169803c4be9524245e70a3 | |
parent | 40460185f7e4b1126335c841366a14ef9ecb1837 (diff) | |
download | ATCD-948d6bfec7f0b11d5ea06b8c71bc7f076832fdb3.tar.gz |
ChangeLogTag:Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-rw-r--r-- | ChangeLog | 32 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 32 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 32 | ||||
-rw-r--r-- | ace/Message_Queue.h | 26 | ||||
-rw-r--r-- | ace/Message_Queue_T.h | 2 | ||||
-rw-r--r-- | ace/Task_T.cpp | 13 | ||||
-rw-r--r-- | ace/Task_T.h | 11 | ||||
-rw-r--r-- | examples/Threads/thread_pool.cpp | 39 | ||||
-rw-r--r-- | tests/Thread_Pool_Test.cpp | 341 |
9 files changed, 396 insertions, 132 deletions
diff --git a/ChangeLog b/ChangeLog index daebfc7eb2f..db870295d79 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,33 @@ +Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * tests/Thread_Pool_Test.cpp: Enhanced this test to illustrate how + to shut down Tasks using either the "empty message" strategy or + the "queue deactivation" strategy. + + * ace/Message_Queue.h: Updated the documentation of the enqueue*() + and dequeue*() methods to clarify which errno values are set + when the calls return -1. + + * examples/Threads/thread_pool.cpp: Updated this example to + remove the use of the now-defunct "wait_for_threads_to_shutdown" + feature of ACE_Task. + + * ace/Task_T: Removed the recent feature added on + + Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + that allowed to an ACE_Task's destructor to wait for threads in + a task to exit. It turns out this is practically impossible to + use correctly because of the way that destructors are destroyed + from the "top down". However, it's trivial to get the same + behavior by simply calling the Tasks's wait() method whenever + you want to implement barrier synchronization on a Task's thread + exits. + + * tests/Thread_Pool_Test.cpp: Updated this test to illustrate + various strategies to wait for threads to exit. Thanks to Mark + C. Barnes <marcus@muse3d.com> for motivating this example. + Mon Jan 17 14:20:17 2000 Nanbor Wang <nanbor@cs.wustl.edu> * ace/OS.h: @@ -273,7 +303,7 @@ Sun Jan 9 00:25:58 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> reporting this problem with VxWorks. * ace/Pair_T.h: Reformatted to conform to the ACE style. - + Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * examples/Threads/thread_pool.cpp: Revised the example to diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index daebfc7eb2f..db870295d79 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,33 @@ +Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * tests/Thread_Pool_Test.cpp: Enhanced this test to illustrate how + to shut down Tasks using either the "empty message" strategy or + the "queue deactivation" strategy. + + * ace/Message_Queue.h: Updated the documentation of the enqueue*() + and dequeue*() methods to clarify which errno values are set + when the calls return -1. + + * examples/Threads/thread_pool.cpp: Updated this example to + remove the use of the now-defunct "wait_for_threads_to_shutdown" + feature of ACE_Task. + + * ace/Task_T: Removed the recent feature added on + + Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + that allowed to an ACE_Task's destructor to wait for threads in + a task to exit. It turns out this is practically impossible to + use correctly because of the way that destructors are destroyed + from the "top down". However, it's trivial to get the same + behavior by simply calling the Tasks's wait() method whenever + you want to implement barrier synchronization on a Task's thread + exits. + + * tests/Thread_Pool_Test.cpp: Updated this test to illustrate + various strategies to wait for threads to exit. Thanks to Mark + C. Barnes <marcus@muse3d.com> for motivating this example. + Mon Jan 17 14:20:17 2000 Nanbor Wang <nanbor@cs.wustl.edu> * ace/OS.h: @@ -273,7 +303,7 @@ Sun Jan 9 00:25:58 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> reporting this problem with VxWorks. * ace/Pair_T.h: Reformatted to conform to the ACE style. - + Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * examples/Threads/thread_pool.cpp: Revised the example to diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index daebfc7eb2f..db870295d79 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,33 @@ +Mon Jan 17 18:03:17 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * tests/Thread_Pool_Test.cpp: Enhanced this test to illustrate how + to shut down Tasks using either the "empty message" strategy or + the "queue deactivation" strategy. + + * ace/Message_Queue.h: Updated the documentation of the enqueue*() + and dequeue*() methods to clarify which errno values are set + when the calls return -1. + + * examples/Threads/thread_pool.cpp: Updated this example to + remove the use of the now-defunct "wait_for_threads_to_shutdown" + feature of ACE_Task. + + * ace/Task_T: Removed the recent feature added on + + Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + that allowed to an ACE_Task's destructor to wait for threads in + a task to exit. It turns out this is practically impossible to + use correctly because of the way that destructors are destroyed + from the "top down". However, it's trivial to get the same + behavior by simply calling the Tasks's wait() method whenever + you want to implement barrier synchronization on a Task's thread + exits. + + * tests/Thread_Pool_Test.cpp: Updated this test to illustrate + various strategies to wait for threads to exit. Thanks to Mark + C. Barnes <marcus@muse3d.com> for motivating this example. + Mon Jan 17 14:20:17 2000 Nanbor Wang <nanbor@cs.wustl.edu> * ace/OS.h: @@ -273,7 +303,7 @@ Sun Jan 9 00:25:58 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> reporting this problem with VxWorks. * ace/Pair_T.h: Reformatted to conform to the ACE style. - + Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * examples/Threads/thread_pool.cpp: Revised the example to diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h index 75632ba2273..d028bcb999c 100644 --- a/ace/Message_Queue.h +++ b/ace/Message_Queue.h @@ -63,27 +63,35 @@ public: // = Enqueue and dequeue methods. - // For the following enqueue and dequeue methods if <timeout> == 0, - // the caller will block until action is possible, else will wait - // until the absolute time specified in *<timeout> elapses). These - // calls will return, however, when queue is closed, deactivated, - // when a signal occurs, or if the time specified in timeout - // elapses, (in which case errno = EWOULDBLOCK). + // For the following enqueue and dequeue methods, the caller will + // block until action is possible if <timeout> == 0. Otherwise, it + // will wait until the absolute time specified in *<timeout> + // elapses. These calls will -1 when queue is closed, deactivated + // (in which case <errno> == <ESHUTDOWN>), when a signal occurs (in + // which case <errno> == <EINTR>, or if the time specified in + // timeout elapses (in which case <errno> == <EWOULDBLOCK>). virtual int enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0) = 0; + // Enqueue a <ACE_Message_Block *> into the tail of the queue. + // Returns number of items in queue if the call succeeds or -1 + // otherwise. virtual int enqueue (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0) = 0; // Enqueue a <ACE_Message_Block *> into the tail of the queue. - // Return -1 on failure, number of items in queue otherwise. + // Returns number of items in queue if the call succeeds or -1 + // otherwise. virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0) = 0; + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. Returns number of items in queue if the call succeeds or + // -1 otherwise. virtual int dequeue (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0) = 0; // Dequeue and return the <ACE_Message_Block *> at the head of the - // queue. Returns -1 on failure, else the number of items still on - // the queue. + // queue. Returns number of items in queue if the call succeeds or + // -1 otherwise. // = Check if queue is full/empty. virtual int is_full (void) = 0; diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index 6543244d54a..31af1dea39a 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -10,7 +10,7 @@ // Message_Queue_T.h // // = AUTHOR -// Doug Schmidt +// Douglas C. Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ diff --git a/ace/Task_T.cpp b/ace/Task_T.cpp index 88a9354396b..17c4612e67e 100644 --- a/ace/Task_T.cpp +++ b/ace/Task_T.cpp @@ -45,14 +45,12 @@ ACE_Task<ACE_SYNCH_USE>::dump (void) const template<ACE_SYNCH_DECL> ACE_Task<ACE_SYNCH_USE>::ACE_Task (ACE_Thread_Manager *thr_man, - ACE_Message_Queue<ACE_SYNCH_USE> *mq, - int wait_for_threads_in_destructor) + ACE_Message_Queue<ACE_SYNCH_USE> *mq) : ACE_Task_Base (thr_man), msg_queue_ (0), delete_msg_queue_ (0), mod_ (0), - next_ (0), - wait_for_threads_in_destructor_ (wait_for_threads_in_destructor) + next_ (0) { ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::ACE_Task"); @@ -70,13 +68,6 @@ template<ACE_SYNCH_DECL> ACE_Task<ACE_SYNCH_USE>::~ACE_Task (void) { ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::~ACE_Task"); - // Wait for any and all threads in this task to exit. - if (this->wait_for_threads_in_destructor_ != 0 - && this->wait () == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT ("(%t) %p\n"), - ASYS_TEXT ("wait() failed in ~ACE_Task_Base"))); - if (this->delete_msg_queue_) delete this->msg_queue_; diff --git a/ace/Task_T.h b/ace/Task_T.h index f0950e91a0e..369032ea953 100644 --- a/ace/Task_T.h +++ b/ace/Task_T.h @@ -45,14 +45,11 @@ public: // = Initialization/termination methods. ACE_Task (ACE_Thread_Manager *thr_mgr = 0, - ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0, - int wait_for_threads_in_destructor = 0); + ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0); // Initialize a Task, supplying a thread manager and a message // queue. If the user doesn't supply a ACE_Message_Queue pointer // then we'll allocate one dynamically. Otherwise, we'll use the - // one passed as a parameter. If <wait_for_threads_in_destructor> - // is non-0 then <~ACE_Task> will wait for any threads in this task - // to exit before returning. + // one passed as a parameter. virtual ~ACE_Task (void); // Destructor. @@ -140,10 +137,6 @@ public: // Should be protected: ACE_Task<ACE_SYNCH_USE> *next_; // Pointer to adjacent ACE_Task. - int wait_for_threads_in_destructor_; - // If this is non-0 then wait for threads in this task to exit - // before returning from the destructor. - void dump (void) const; // Dump the state of an object. diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp index 8bd09996ed8..166a0e3d1cc 100644 --- a/examples/Threads/thread_pool.cpp +++ b/examples/Threads/thread_pool.cpp @@ -1,11 +1,11 @@ // $Id$ -// This test program illustrates how the ACE task synchronization -// mechanisms work in conjunction with the ACE_Task and the -// ACE_Thread_Manager. If the <manual> flag is set input comes from -// stdin until the user enters a return -- otherwise, the input is -// generated automatically. All worker threads shutdown when they -// receive a message block of length 0. +// This test program illustrates how the <ACE_Task> synchronization +// mechanisms work in conjunction with the <ACE_Thread_Manager>. If +// the <manual> flag is set input comes from stdin until the user +// enters a return -- otherwise, the input is generated automatically. +// All worker threads shutdown when they receive a message block of +// length 0. // // This code is original based on a test program written by Karlheinz // Dorn <Karlheinz.Dorn@med.siemens.de>. It was modified to utilize @@ -61,9 +61,7 @@ Thread_Pool::close (u_long) Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads) - : ACE_Task<ACE_MT_SYNCH> (thr_mgr, - 0, - 1) // Wait in destructor for threads to exit... + : ACE_Task<ACE_MT_SYNCH> (thr_mgr) { // Create the pool of worker threads. if (this->activate (THR_NEW_LWP, @@ -197,11 +195,11 @@ producer (Thread_Pool &thread_pool) } else { - // Send a shutdown message to the waiting threads and exit. ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n")); // thread_pool.dump (); + // Send a shutdown message to the waiting threads and exit. for (int i = thread_pool.thr_count (); i > 0; i--) { ACE_DEBUG ((LM_DEBUG, @@ -239,18 +237,19 @@ main (int argc, char *argv[]) argc, n_threads)); - { - // Create the worker tasks. - Thread_Pool thread_pool (ACE_Thread_Manager::instance (), - n_threads); + // Create the worker tasks. + Thread_Pool thread_pool (ACE_Thread_Manager::instance (), + n_threads); - // Create work for the worker tasks to process in their own threads. - producer (thread_pool); + // Create work for the worker tasks to process in their own threads. + producer (thread_pool); - // Wait for all the threads to reach their exit point. - ACE_DEBUG ((LM_DEBUG, - "(%t) waiting for threads to exit in Thread_Pool destructor...\n")); - } + ACE_DEBUG ((LM_DEBUG, + "(%t) waiting for threads to exit in Thread_Pool destructor...\n")); + // Wait for all the threads to reach their exit point. + if (thread_pool.wait () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) wait() failed\n"), + 1); ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n")); diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp index a66f310040a..165d0d2ad99 100644 --- a/tests/Thread_Pool_Test.cpp +++ b/tests/Thread_Pool_Test.cpp @@ -9,13 +9,13 @@ // Thread_Pool_Test.cpp // // = DESCRIPTION -// This test program illustrates how the ACE task synchronization -// mechanisms and ACE_Message_Block reference counting works in -// conjunction with the ACE_Task and the ACE_Thread_Manager. If -// the manual flag is not set input comes from stdin until the -// user enters a return. This stops all workers via a message -// block of length 0. This shows an alternative way to shutdown -// worker tasks compared to queue deactivate. +// This test program illustrates how the <ACE_Task> +// synchronization mechanisms work in conjunction with the +// <ACE_Thread_Manager>. If the <manual> flag is set input comes +// from stdin until the user enters a return -- otherwise, the +// input is generated automatically. All worker threads shutdown +// when (1) they receive a message block of length 0 or (2) the +// queue is deactivated. // // = AUTHOR // Karlheinz Dorn <Karlheinz.Dorn@med.siemens.de>, @@ -39,32 +39,49 @@ USELIB("..\ace\aced.lib"); // Number of iterations to run the test. static size_t n_iterations = 100; +// Controls whether the input is generated "manually" or automatically. +static int manual = 0; + class Thread_Pool : public ACE_Task<ACE_MT_SYNCH> { + // = TITLE + // Defines a thread pool abstraction based on the <ACE_Task>. public: Thread_Pool (int n_threads); // Create the thread pool containing <n_threads>. ~Thread_Pool (void); + // Destructor... - virtual int open (void * = 0); - // Produce the messages that are consumed by the threads in the - // thread pool. + int test_queue_deactivation_shutdown (void); + // Activate the task's thread pool, produce the messages that are + // consumed by the threads in the thread pool, and demonstate how to + // shutdown using the <ACE_Message_Queue::deactivate> method. + + int test_empty_message_shutdown (void); + // Activate the task's thread pool, produce the messages that are, + // produce the messages that are consumed by the threads in the + // thread pool, and demonstrate how to shutdown by enqueueing + // "empty" messages into the queue. virtual int svc (void); // Iterate <n_iterations> time printing off a message and "waiting" // for all other threads to complete this iteration. - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); // Allows the producer to pass messages to the <Thread_Pool>. private: + virtual int open (void * = 0); + // Spawn the threads in the pool. + virtual int close (u_long); // Close hook. ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_; // Serialize access to <ACE_Message_Block> reference count, which - // will be decremented from multiple threads. + // will be decremented by multiple threads. int n_threads_; // Number of threads to spawn. @@ -78,7 +95,7 @@ int Thread_Pool::close (u_long) { ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) close of worker\n"))); + ASYS_TEXT ("(%t) worker thread closing down\n"))); return 0; } @@ -90,7 +107,8 @@ Thread_Pool::Thread_Pool (int n_threads) // Simply enqueue the Message_Block into the end of the queue. int -Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +Thread_Pool::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { return this->putq (mb, tv); } @@ -109,13 +127,27 @@ Thread_Pool::svc (void) ACE_Message_Block *mb; int result = this->getq (mb); - ACE_ASSERT (result != -1); + + ACE_ASSERT (result != -1 || errno == ESHUTDOWN); + + if (result == -1 && errno == ESHUTDOWN) + { + // The queue has been deactivated, so let's bail out. + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ") + ASYS_TEXT ("queue deactivated, exiting\n"), + count, + this->msg_queue ()->message_count ())); + + break; + } int length = mb->length (); if (length > 0) ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n"), + ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ") + ASYS_TEXT ("length = %d, text = \"%*s\"\n"), count, this->msg_queue ()->message_count (), length, @@ -128,7 +160,8 @@ Thread_Pool::svc (void) if (length == 0) { ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) in iteration %d, queue len = %d, got NULL message, exiting\n"), + ASYS_TEXT ("(%t) in iteration %d, queue len = %d, ") + ASYS_TEXT ("got \"empty\" message, exiting\n"), count, this->msg_queue ()->message_count ())); break; @@ -154,13 +187,30 @@ Thread_Pool::open (void *) ASYS_TEXT ("%p\n"), ASYS_TEXT ("activate failed")), -1); + return 0; +} + +// Activate the task's thread pool, produce the messages that are +// consumed by the threads in the thread pool, and demonstate how to +// shutdown using the <ACE_Message_Queue::deactivate> method. + +int +Thread_Pool::test_queue_deactivation_shutdown (void) +{ + if (this->open () == -1) + return -1; ACE_Message_Block *mb = 0; + // Run the main loop that generates messages and enqueues them into + // the pool of threads managed by <ACE_Task>. + for (size_t count = 0; - count < n_iterations; + ; count++) { + ssize_t n = 0; + // Allocate a new message. ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ, @@ -171,67 +221,178 @@ Thread_Pool::open (void *) &this->lock_adapter_), -1); - ACE_OS::sprintf ((ASYS_TCHAR *) mb->rd_ptr (), - ASYS_TEXT ("%d\n"), - count); - int n = ACE_OS::strlen ((ASYS_TCHAR *) mb->rd_ptr ()); + if (manual) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) enter a new message for the task pool...")); + n = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + } + else + { + static int count = 0; + + ACE_OS::sprintf (mb->rd_ptr (), + "%d\n", + count); + n = ACE_OS::strlen (mb->rd_ptr ()); - if (count == 0 || (count % 20 == 0)) - ACE_OS::sleep (1); + if (count == n_iterations) + n = 1; // Indicate that we need to shut down. + else + count++; - // Send a normal message to the waiting threads and continue - // producing. - mb->wr_ptr (n * sizeof (ASYS_TCHAR)); + if (count == 0 || (count % 20 == 0)) + ACE_OS::sleep (1); + } - // Pass the message to the Thread_Pool. - if (this->put (mb) == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT (" (%t) %p\n"), - ASYS_TEXT ("put"))); + if (n > 1) + { + // Send a normal message to the waiting threads and continue + // producing. + mb->wr_ptr (n * sizeof (ASYS_TCHAR)); + + // Pass the message to the Thread_Pool. + if (this->put (mb) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT (" (%t) %p\n"), + ASYS_TEXT ("put"))); + } + else + { + // Deactivate the message queue and return. + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n(%t) deactivating queue for %d threads, ") + ASYS_TEXT ("dump of task:\n"), + this->thr_count ())); + this->dump (); + + // Deactivate the queue. + return this->msg_queue ()->deactivate (); + } } +} - // Send a shutdown message to the waiting threads and exit. - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, dump of task:\n"), - this->thr_count ())); - this->dump (); +// Activate the task's thread pool, produce the messages that are, +// produce the messages that are consumed by the threads in the thread +// pool, and demonstrate how to shutdown by enqueueing "empty" +// messages into the queue. + +int +Thread_Pool::test_empty_message_shutdown (void) +{ + if (this->open () == -1) + return -1; + + ACE_Message_Block *mb = 0; + + // Run the main loop that generates messages and enqueues them into + // the pool of threads managed by <ACE_Task>. - ACE_NEW_RETURN (mb, - ACE_Message_Block (0, - ACE_Message_Block::MB_DATA, - 0, - 0, - 0, - &this->lock_adapter_), - -1); - int i = 0; - - for (i = this->thr_count (); - i > 0; - i--) + for (size_t count = 0; + ; + count++) { - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) EOF, enqueueing NULL block for thread = %d\n"), - i)); - - // Enqueue an empty message to flag each consumer to shutdown. - // Note that we use reference counting to avoid having to copy - // the message. - ACE_Message_Block *dup = mb->duplicate (); - - if (this->put (dup) == -1) - ACE_ERROR ((LM_ERROR, - ASYS_TEXT (" (%t) %p\n"), - ASYS_TEXT ("put"))); - } + ssize_t n = 0; - mb->release (); + // Allocate a new message. + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ, + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + &this->lock_adapter_), + -1); - ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("\n(%t) end loop, dump of task:\n"))); - this->dump (); + if (manual) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) enter a new message for the task pool...")); + n = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + } + else + { + static int count = 0; - return 0; + ACE_OS::sprintf (mb->rd_ptr (), + "%d\n", + count); + n = ACE_OS::strlen (mb->rd_ptr ()); + + if (count == n_iterations) + n = 1; // Indicate that we need to shut down. + else + count++; + + if (count == 0 || (count % 20 == 0)) + ACE_OS::sleep (1); + } + + if (n > 1) + { + // Send a normal message to the waiting threads and continue + // producing. + mb->wr_ptr (n * sizeof (ASYS_TCHAR)); + + // Pass the message to the Thread_Pool. + if (this->put (mb) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT (" (%t) %p\n"), + ASYS_TEXT ("put"))); + } + else + { + // Send a shutdown message to the waiting threads and return. + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n(%t) sending shutdown message to %d threads, ") + ASYS_TEXT ("dump of task:\n"), + this->thr_count ())); + this->dump (); + + ACE_NEW_RETURN (mb, + ACE_Message_Block (0, + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + &this->lock_adapter_), + -1); + int i = 0; + + // Enqueue an empty message to flag each consumer thread to + // inform it to shutdown. + for (i = this->thr_count (); + i > 0; + i--) + { + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) end of input, ") + ASYS_TEXT ("enqueueing \"empty\" message %d\n"), + i)); + + // Note the use of reference counting to avoid copying + // the message contents. + ACE_Message_Block *dup = mb->duplicate (); + + if (this->put (dup) == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT (" (%t) %p\n"), + ASYS_TEXT ("put"))); + } + + mb->release (); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n(%t) end loop, dump of task:\n"))); + this->dump (); + + return 0; + } + } } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) @@ -250,28 +411,50 @@ main (int, ASYS_TCHAR *[]) #if defined (ACE_HAS_THREADS) int n_threads = ACE_MAX_THREADS; + // Create the worker tasks. + Thread_Pool thread_pool (n_threads); + ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) threads = %d\n"), + ASYS_TEXT ("(%t) running test with %d threads\n"), n_threads)); - // Create the worker tasks. - Thread_Pool thread_pool (n_threads); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) starting empty message shutdown test\n"))); - // Create work for the worker tasks to process in their own threads. - if (thread_pool.open () == -1) + // Activate the task's thread pool, produce the messages that are, + // produce the messages that are consumed by the threads in the + // thread pool, and demonstrate how to shutdown by enqueueing + // "empty" messages into the queue. + if (thread_pool.test_empty_message_shutdown () == -1) return 1; - // Wait for all the threads to reach their exit point. - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n"))); + // Wait for all the threads to reach their exit point, at which + // point the barrier in the destructor of the <ACE_Task> portion of + // <Thread_Pool> will return. + if (thread_pool.wait () == -1) + return 1; + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) starting queue deactivation shutdown test\n"))); - ACE_Thread_Manager::instance ()->wait (); + // Activate the task's thread pool, produce the messages that are + // consumed by the threads in the thread pool, and demonstate how to + // shutdown using the <ACE_Message_Queue::deactivate> method. + if (thread_pool.test_queue_deactivation_shutdown () == -1) + return 1; - ACE_ASSERT (thread_pool.msg_queue ()->is_empty ()); ACE_DEBUG ((LM_DEBUG, - ASYS_TEXT ("(%t) destroying worker tasks and exiting...\n"))); + ASYS_TEXT ("(%t) waiting for worker tasks to finish...\n"))); + // Wait for all the threads to reach their exit point, at which + // point the barrier in the destructor of the <ACE_Task> portion of + // <Thread_Pool> will return. + if (thread_pool.wait () == -1) + return 1; + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("(%t) all worker tasks destroyed, exiting test...\n"))); #else ACE_ERROR ((LM_INFO, ASYS_TEXT ("threads not supported on this platform\n"))); |