diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2000-01-08 22:08:47 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2000-01-08 22:08:47 +0000 |
commit | 9689e9d7f495f4a4a00dd2536af628dd39bb96ee (patch) | |
tree | 075c88aa547e84b88698421e8d3d584ce2e2156d | |
parent | 4e20bf8f28a77b5707a74d9ec53949996b61ee3d (diff) | |
download | ATCD-9689e9d7f495f4a4a00dd2536af628dd39bb96ee.tar.gz |
ChangeLogTag:Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-rw-r--r-- | ChangeLog | 12 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 12 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 12 | ||||
-rw-r--r-- | ace/Proactor.cpp | 2 | ||||
-rw-r--r-- | ace/Task.cpp | 1 | ||||
-rw-r--r-- | ace/Task_T.cpp | 13 | ||||
-rw-r--r-- | ace/Task_T.h | 11 | ||||
-rw-r--r-- | ace/Thread_Manager.cpp | 23 | ||||
-rw-r--r-- | ace/Thread_Manager.h | 3 | ||||
-rw-r--r-- | examples/Threads/task_four.cpp | 122 | ||||
-rw-r--r-- | examples/Threads/task_three.cpp | 3 | ||||
-rw-r--r-- | examples/Threads/thread_pool.cpp | 189 |
12 files changed, 284 insertions, 119 deletions
diff --git a/ChangeLog b/ChangeLog index 7802d35ecda..f9534e10793 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,17 @@ Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + * examples/Threads/thread_pool.cpp: Revised the example to + illustrate the new "wait_for_threads_in_destructor" feature of + ACE_Task. + + * examples/Threads/task_four.cpp: Reformatted the code. + + * ace/Task_T: Added a new flag to the constructor that enables + applications to request that an ACE_Task will wait in its + destructor for any and all threads in the task to exit before + returning. Thanks to Valery Arkhangorodsky + <valerya@servicesoft.com> for suggesting this. + * ace/OS.i: Needed to add an extern "C" {} block around the setregid() and setreuid() functions. Thanks to David Levine for reporting this. diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index 7802d35ecda..f9534e10793 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,5 +1,17 @@ Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + * examples/Threads/thread_pool.cpp: Revised the example to + illustrate the new "wait_for_threads_in_destructor" feature of + ACE_Task. + + * examples/Threads/task_four.cpp: Reformatted the code. + + * ace/Task_T: Added a new flag to the constructor that enables + applications to request that an ACE_Task will wait in its + destructor for any and all threads in the task to exit before + returning. Thanks to Valery Arkhangorodsky + <valerya@servicesoft.com> for suggesting this. + * ace/OS.i: Needed to add an extern "C" {} block around the setregid() and setreuid() functions. Thanks to David Levine for reporting this. diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index 7802d35ecda..f9534e10793 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,5 +1,17 @@ Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + * examples/Threads/thread_pool.cpp: Revised the example to + illustrate the new "wait_for_threads_in_destructor" feature of + ACE_Task. + + * examples/Threads/task_four.cpp: Reformatted the code. + + * ace/Task_T: Added a new flag to the constructor that enables + applications to request that an ACE_Task will wait in its + destructor for any and all threads in the task to exit before + returning. Thanks to Valery Arkhangorodsky + <valerya@servicesoft.com> for suggesting this. + * ace/OS.i: Needed to add an extern "C" {} block around the setregid() and setreuid() functions. Thanks to David Levine for reporting this. diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 6089b432e8f..477762498e0 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -275,7 +275,7 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation, ACE_NEW (this->timer_handler_, ACE_Proactor_Timer_Handler (*this)); - // Activate <timer_handler> + // Activate <timer_handler>. if (this->timer_handler_->activate (THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%N:%l:(%P | %t):%p\n"), diff --git a/ace/Task.cpp b/ace/Task.cpp index 20f4d7d5595..91a7588d65b 100644 --- a/ace/Task.cpp +++ b/ace/Task.cpp @@ -26,6 +26,7 @@ ACE_Task_Base::ACE_Task_Base (ACE_Thread_Manager *thr_man) } // Wait for all threads running in a task to exit. + int ACE_Task_Base::wait (void) { diff --git a/ace/Task_T.cpp b/ace/Task_T.cpp index 17c4612e67e..88a9354396b 100644 --- a/ace/Task_T.cpp +++ b/ace/Task_T.cpp @@ -45,12 +45,14 @@ ACE_Task<ACE_SYNCH_USE>::dump (void) const template<ACE_SYNCH_DECL> ACE_Task<ACE_SYNCH_USE>::ACE_Task (ACE_Thread_Manager *thr_man, - ACE_Message_Queue<ACE_SYNCH_USE> *mq) + ACE_Message_Queue<ACE_SYNCH_USE> *mq, + int wait_for_threads_in_destructor) : ACE_Task_Base (thr_man), msg_queue_ (0), delete_msg_queue_ (0), mod_ (0), - next_ (0) + next_ (0), + wait_for_threads_in_destructor_ (wait_for_threads_in_destructor) { ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::ACE_Task"); @@ -68,6 +70,13 @@ template<ACE_SYNCH_DECL> ACE_Task<ACE_SYNCH_USE>::~ACE_Task (void) { ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::~ACE_Task"); + // Wait for any and all threads in this task to exit. + if (this->wait_for_threads_in_destructor_ != 0 + && this->wait () == -1) + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("(%t) %p\n"), + ASYS_TEXT ("wait() failed in ~ACE_Task_Base"))); + if (this->delete_msg_queue_) delete this->msg_queue_; diff --git a/ace/Task_T.h b/ace/Task_T.h index 0fb31c4aa78..f0950e91a0e 100644 --- a/ace/Task_T.h +++ b/ace/Task_T.h @@ -45,11 +45,14 @@ public: // = Initialization/termination methods. ACE_Task (ACE_Thread_Manager *thr_mgr = 0, - ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0); + ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0, + int wait_for_threads_in_destructor = 0); // Initialize a Task, supplying a thread manager and a message // queue. If the user doesn't supply a ACE_Message_Queue pointer // then we'll allocate one dynamically. Otherwise, we'll use the - // one they give. + // one passed as a parameter. If <wait_for_threads_in_destructor> + // is non-0 then <~ACE_Task> will wait for any threads in this task + // to exit before returning. virtual ~ACE_Task (void); // Destructor. @@ -137,6 +140,10 @@ public: // Should be protected: ACE_Task<ACE_SYNCH_USE> *next_; // Pointer to adjacent ACE_Task. + int wait_for_threads_in_destructor_; + // If this is non-0 then wait for threads in this task to exit + // before returning from the destructor. + void dump (void) const; // Dump the state of an object. diff --git a/ace/Thread_Manager.cpp b/ace/Thread_Manager.cpp index 7447f3bdf8b..bcce9716399 100644 --- a/ace/Thread_Manager.cpp +++ b/ace/Thread_Manager.cpp @@ -1902,7 +1902,7 @@ ACE_Thread_Manager::apply_task (ACE_Task_Base *task, return result; } -// Wait for task +// Wait for all threads to exit a task. int ACE_Thread_Manager::wait_task (ACE_Task_Base *task) @@ -1911,7 +1911,7 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task) ACE_Thread_Descriptor_Base *copy_table = 0; // We have to make sure that while we wait for these threads to - // exit, we do not have the lock. Therefore we make a copy of all + // exit, we do not have the lock. Therefore we make a copy of all // interesting entries and let go of the lock. { ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); @@ -1930,12 +1930,16 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task) for (ACE_Double_Linked_List_Iterator<ACE_Thread_Descriptor> iter (this->thr_list_); !iter.done (); iter.advance ()) - // If threads are created as THR_DETACHED or THR_DAEMON, we can't help much here. + // If threads are created as THR_DETACHED or THR_DAEMON, we + // can't wait on them here. if (iter.next ()->task_ == task && - (ACE_BIT_DISABLED (iter.next ()->flags_, THR_DETACHED | THR_DAEMON) - || ACE_BIT_ENABLED (iter.next ()->flags_, THR_JOINABLE))) + (ACE_BIT_DISABLED (iter.next ()->flags_, + THR_DETACHED | THR_DAEMON) + || ACE_BIT_ENABLED (iter.next ()->flags_, + THR_JOINABLE))) { - ACE_SET_BITS (iter.next ()->thr_state_, ACE_THR_JOINING); + ACE_SET_BITS (iter.next ()->thr_state_, + ACE_THR_JOINING); copy_table[copy_count++] = *iter.next (); } @@ -1946,7 +1950,8 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task) // If threads are created as THR_DETACHED or THR_DAEMON, we can't help much here. if (titer.next ()->task_ == task) { - ACE_Thread_Descriptor_Base *tdb = titer.advance_and_remove (0); + ACE_Thread_Descriptor_Base *tdb = + titer.advance_and_remove (0); copy_table[copy_count++] = *tdb; delete tdb; } @@ -1956,7 +1961,9 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task) // Now to do the actual work int result = 0; - for (int i = 0; i < copy_count && result != -1; i++) + for (int i = 0; + i < copy_count && result != -1; + i++) { if (ACE_Thread::join (copy_table[i].thr_handle_) == -1) result = -1; diff --git a/ace/Thread_Manager.h b/ace/Thread_Manager.h index 08c85fabaac..d35afbedef4 100644 --- a/ace/Thread_Manager.h +++ b/ace/Thread_Manager.h @@ -603,9 +603,10 @@ public: // resembles <suspend_thr>. // = Operations on ACE_Tasks. + int wait_task (ACE_Task_Base *task); // Block until there are no more threads running in <task>. Returns - // 0 on success and -1 on failure. Notice that wait_task will not + // 0 on success and -1 on failure. Note that <wait_task> will not // wait on detached threads. int suspend_task (ACE_Task_Base *task); // Suspend all threads in an ACE_Task. diff --git a/examples/Threads/task_four.cpp b/examples/Threads/task_four.cpp index 177a9950b7b..42ad56d9c06 100644 --- a/examples/Threads/task_four.cpp +++ b/examples/Threads/task_four.cpp @@ -1,7 +1,7 @@ // $Id$ // The following test was written by Hamutal Yanay & Ari Erev's -// (Ari_Erev@comverse.com). +// (Ari_Erev@comverse.com). // // This test program test enhancements to the thread_manager and task // classes. The purpose of these enhancements was to allow the @@ -85,7 +85,9 @@ size_t Worker_Task::workers_count_ = 1; int Worker_Task::close (u_long) { - ACE_DEBUG ((LM_DEBUG, "(%t) closing task %d\n", this->index_)); + ACE_DEBUG ((LM_DEBUG, + "(%t) closing task %d\n", + this->index_)); delete this; return 0; } @@ -103,24 +105,35 @@ Worker_Task::Worker_Task (ACE_Thread_Manager *thr_mgr, int Worker_Task::open (void *) { - // Create worker threads. - return this->activate (THR_NEW_LWP, n_threads_, 0, -1, -1, this); + // Create the pool of worker threads. + return this->activate (THR_NEW_LWP, + n_threads_, + 0, + -1, + -1, + this); } int Worker_Task::svc (void) { - ACE_DEBUG ((LM_DEBUG, " (%t) in worker %d\n", index_)); + ACE_DEBUG ((LM_DEBUG, + " (%t) in worker %d\n", + index_)); for (size_t iterations = 1; iterations <= this->n_iterations_; iterations++) { - ACE_DEBUG ((LM_DEBUG, " (%t) in iteration %d\n", iterations)); + ACE_DEBUG ((LM_DEBUG, + " (%t) in iteration %d\n", + iterations)); ACE_OS::sleep (0); } - ACE_DEBUG ((LM_DEBUG, " (%t) worker %d ends\n", index_)); + ACE_DEBUG ((LM_DEBUG, + " (%t) worker %d ends\n", + index_)); return 0; } @@ -134,9 +147,16 @@ Invoker_Task::Invoker_Task (ACE_Thread_Manager *thr_mgr, n_threads_ (n_threads), n_iterations_ (n_iterations) { - // Create worker threads. - if (this->activate (THR_NEW_LWP, 1, 0, -1, -1, this) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); + // Create a single worker thread. + if (this->activate (THR_NEW_LWP, + 1, + 0, + -1, + -1, + this) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "activate failed")); } // Iterate <n_iterations> time printing off a message and "waiting" @@ -145,64 +165,84 @@ Invoker_Task::Invoker_Task (ACE_Thread_Manager *thr_mgr, int Invoker_Task::svc (void) { - // Note that the ACE_Task::svc_run () method automatically adds us to - // the Thread_Manager when the thread begins. + // Note that the ACE_Task::svc_run () method automatically adds us + // to the Thread_Manager when the thread begins. - ACE_Thread_Manager *thr_mgr = ACE_Thread_Manager::instance (); + ACE_Thread_Manager *thr_mgr = + ACE_Thread_Manager::instance (); Worker_Task **worker_task; - ACE_NEW_RETURN (worker_task, Worker_Task *[n_tasks_], -1); - + ACE_NEW_RETURN (worker_task, + Worker_Task *[n_tasks_], + -1); size_t task; for (task = 0; task < this->n_tasks_; task++) { - ACE_DEBUG ((LM_DEBUG, " (%t) in task %d\n", task+1)); + ACE_DEBUG ((LM_DEBUG, + " (%t) in task %d\n", + task + 1)); ACE_NEW_RETURN (worker_task[task], - Worker_Task (thr_mgr, n_threads_, n_iterations_), + Worker_Task (thr_mgr, + n_threads_, + n_iterations_), -1); if (worker_task[task]->open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open failed"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "open failed"), + -1); } // Set all tasks to be one group - ACE_DEBUG ((LM_DEBUG, " (%t) setting tasks group id\n")); + ACE_DEBUG ((LM_DEBUG, + " (%t) setting tasks group id\n")); for (task = 0; task < this->n_tasks_; task++) - if (thr_mgr->set_grp (worker_task[task], 1) == -1) - ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "set_grp")); - - size_t n_tasks = thr_mgr->num_tasks_in_group (1); - ACE_DEBUG ((LM_DEBUG, "Number of tasks in group 1: %d\n", n_tasks)) ; + if (thr_mgr->set_grp (worker_task[task], + 1) == -1) + ACE_ERROR ((LM_DEBUG, + " (%t) %p\n", + "set_grp")); + + size_t n_tasks = + thr_mgr->num_tasks_in_group (1); + ACE_DEBUG ((LM_DEBUG, + "Number of tasks in group 1: %d\n", + n_tasks)) ; // Wait for 1 second and then suspend every thread in the group. ACE_OS::sleep (1); - ACE_DEBUG ((LM_DEBUG, " (%t) suspending group\n")); + ACE_DEBUG ((LM_DEBUG, + " (%t) suspending group\n")); if (thr_mgr->suspend_grp (1) == -1) - ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_grp")); + ACE_ERROR ((LM_DEBUG, + " (%t) %p\n", + "suspend_grp")); - // Wait for 3 more second and then resume every thread in the - // group. + // Wait for 3 more second and then resume every thread in the group. ACE_OS::sleep (ACE_Time_Value (2)); - ACE_DEBUG ((LM_DEBUG, " (%t) resuming group\n")); + ACE_DEBUG ((LM_DEBUG, + " (%t) resuming group\n")); if (thr_mgr->resume_grp (1) == -1) - ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_grp")); + ACE_ERROR ((LM_DEBUG, + " (%t) %p\n", + "resume_grp")); // Wait for all the tasks to reach their exit point. thr_mgr->wait (); // Note that the ACE_Task::svc_run () method automatically removes // us from the Thread_Manager when the thread exits. - return 0; } @@ -228,30 +268,38 @@ main (int argc, char *argv[]) // Wait for 1 second and then suspend the invoker task ACE_OS::sleep (1); - ACE_DEBUG ((LM_DEBUG, " (%t) suspending invoker task\n")); + ACE_DEBUG ((LM_DEBUG, + " (%t) suspending invoker task\n")); if (invoker_manager.suspend_task (&invoker) == -1) - ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_task")); + ACE_ERROR ((LM_DEBUG, + " (%t) %p\n", + "suspend_task")); // Wait for 3 more second and then resume the invoker task. ACE_OS::sleep (ACE_Time_Value (3)); - ACE_DEBUG ((LM_DEBUG, " (%t) resuming invoker task\n")); + ACE_DEBUG ((LM_DEBUG, + " (%t) resuming invoker task\n")); if (invoker_manager.resume_task (&invoker) == -1) - ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_task")); + ACE_ERROR ((LM_DEBUG, + " (%t) %p\n", + "resume_task")); // Wait for all the threads to reach their exit point. invoker_manager.wait (); - ACE_DEBUG ((LM_DEBUG, " (%t) done\n")); + ACE_DEBUG ((LM_DEBUG, + " (%t) done\n")); return 0; } #else int main (int, char *[]) { - ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + ACE_ERROR ((LM_ERROR, + "threads not supported on this platform\n")); return 0; } #endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/task_three.cpp b/examples/Threads/task_three.cpp index b6305330195..0bda095f67c 100644 --- a/examples/Threads/task_three.cpp +++ b/examples/Threads/task_three.cpp @@ -195,7 +195,8 @@ main (int argc, char **) Test_Task t1[TASK_COUNT]; Test_Task t2[TASK_COUNT]; - ACE_Thread::spawn (ACE_THR_FUNC (dispatch), reactor2); + ACE_Thread::spawn (ACE_THR_FUNC (dispatch), + reactor2); reactor1->owner (ACE_OS::thr_self ()); diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp index 3ec15e51b46..8bd09996ed8 100644 --- a/examples/Threads/thread_pool.cpp +++ b/examples/Threads/thread_pool.cpp @@ -2,13 +2,14 @@ // This test program illustrates how the ACE task synchronization // mechanisms work in conjunction with the ACE_Task and the -// ACE_Thread_Manager. If the manual flag is not set input comes from -// stdin until the user enters a return only. This stops all workers -// via a message block of length 0. This is an alternative shutdown of -// workers compared to queue deactivate. +// ACE_Thread_Manager. If the <manual> flag is set input comes from +// stdin until the user enters a return -- otherwise, the input is +// generated automatically. All worker threads shutdown when they +// receive a message block of length 0. // // This code is original based on a test program written by Karlheinz -// Dorn. It was modified to utilize more "ACE" features by Doug Schmidt. +// Dorn <Karlheinz.Dorn@med.siemens.de>. It was modified to utilize +// more ACE features by Doug Schmidt <schmidt@cs.wustl.edu>. #include "ace/Task.h" #include "ace/Service_Config.h" @@ -19,45 +20,68 @@ ACE_RCSID(Threads, thread_pool, "$Id$") #if defined (ACE_HAS_THREADS) -// Number of iterations to run the test. +// Default number of iterations to run the test. static int n_iterations = 100; +// Controls whether the input is generated "manually" or automatically. +static int manual = 0; + class Thread_Pool : public ACE_Task<ACE_MT_SYNCH> { + // = TITLE + // Defines a thread pool abstraction based on the <ACE_Task>. public: - Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads); - + Thread_Pool (ACE_Thread_Manager *thr_mgr, + int n_threads); + // Constructor activates <n_threads> in the thread pool. + + ~Thread_Pool (void); + // Destructor... + virtual int svc (void); // Iterate <n_iterations> time printing off a message and "waiting" // for all other threads to complete this iteration. - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0); + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); // This allows the producer to pass messages to the <Thread_Pool>. private: virtual int close (u_long); + // Close hook. }; int Thread_Pool::close (u_long) { - ACE_DEBUG ((LM_DEBUG, "(%t) close of worker\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) worker thread closing down\n")); return 0; } Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads) - : ACE_Task<ACE_MT_SYNCH> (thr_mgr) + : ACE_Task<ACE_MT_SYNCH> (thr_mgr, + 0, + 1) // Wait in destructor for threads to exit... +{ + // Create the pool of worker threads. + if (this->activate (THR_NEW_LWP, + n_threads) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "activate failed")); +} + +Thread_Pool::~Thread_Pool (void) { - // Create worker threads. - if (this->activate (THR_NEW_LWP, n_threads) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); } // Simply enqueue the Message_Block into the end of the queue. int -Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +Thread_Pool::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { return this->putq (mb, tv); } @@ -68,7 +92,7 @@ Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv) int Thread_Pool::svc (void) { - // Note that the ACE_Task::svc_run () method automatically adds us to + // Note that the <ACE_Task::svc_run> method automatically adds us to // the Thread_Manager when the thread begins. int count = 1; @@ -80,13 +104,16 @@ Thread_Pool::svc (void) { ACE_Message_Block *mb; - ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d before getq ()\n", count)); + ACE_DEBUG ((LM_DEBUG, + "(%t) in iteration %d before getq ()\n", + count)); if (this->getq (mb) == -1) { ACE_ERROR ((LM_ERROR, - "(%t) in iteration %d, got result -1, exiting\n", count)); - break; + "(%t) in iteration %d, got result -1, exiting\n", + count)); + break; } int length = mb->length (); @@ -94,7 +121,10 @@ Thread_Pool::svc (void) if (length > 0) ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d, length = %d, text = \"%*s\"\n", - count, length, length - 1, mb->rd_ptr ())); + count, + length, + length - 1, + mb->rd_ptr ())); // We're responsible for deallocating this. mb->release (); @@ -108,41 +138,51 @@ Thread_Pool::svc (void) } } - // Note that the ACE_Task::svc_run () method automatically removes - // us from the Thread_Manager when the thread exits. + // Note that the <ACE_Task::svc_run> method automatically removes us + // from the <ACE_Thread_Manager> when the thread exits. return 0; } -static void -produce (Thread_Pool &thread_pool) +static void +producer (Thread_Pool &thread_pool) { - ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n")); - thread_pool.dump (); + ACE_DEBUG ((LM_DEBUG, + "(%t) producer start, generating data for the <Thread_Pool>\n")); + // thread_pool.dump (); - for (int n;;) + for (int n; ;) { // Allocate a new message. - ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + ACE_Message_Block *mb; + ACE_NEW (mb, + ACE_Message_Block (BUFSIZ)); -#if defined (manual) - ACE_DEBUG ((LM_DEBUG, - "(%t) press chars and enter to put a new message into task queue...")); - n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ()); -#else // Automatically generate messages. - static int count = 0; + if (manual) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) enter a new message for the task pool...")); + n = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + } + else + { + static int count = 0; - ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count); + ACE_OS::sprintf (mb->rd_ptr (), + "%d\n", + count); + n = ACE_OS::strlen (mb->rd_ptr ()); - n = ACE_OS::strlen (mb->rd_ptr ()); + if (count == n_iterations) + n = 1; // Indicate that we need to shut down. + else + count++; - if (count == n_iterations) - n = 1; // Indicate that we need to shut down. - else - count++; + if (count == 0 || (count % 20 == 0)) + ACE_OS::sleep (1); + } - if (count == 0 || (count % 20 == 0)) - ACE_OS::sleep (1); -#endif /* manual */ if (n > 1) { // Send a normal message to the waiting threads and continue @@ -151,13 +191,16 @@ produce (Thread_Pool &thread_pool) // Pass the message to the Thread_Pool. if (thread_pool.put (mb) == -1) - ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); + ACE_ERROR ((LM_ERROR, + " (%t) %p\n", + "put")); } else { // Send a shutdown message to the waiting threads and exit. - ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n")); - thread_pool.dump (); + ACE_DEBUG ((LM_DEBUG, + "\n(%t) start loop, dump of task:\n")); + // thread_pool.dump (); for (int i = thread_pool.thr_count (); i > 0; i--) { @@ -167,12 +210,18 @@ produce (Thread_Pool &thread_pool) // Enqueue a NULL message to flag each consumer to // shutdown. - if (thread_pool.put (new ACE_Message_Block) == -1) - ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); + ACE_Message_Block *mb; + ACE_NEW (mb, + ACE_Message_Block); + if (thread_pool.put (mb) == -1) + ACE_ERROR ((LM_ERROR, + " (%t) %p\n", + "put")); } - ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n")); - thread_pool.dump (); + ACE_DEBUG ((LM_DEBUG, + "\n(%t) end loop\n")); + // thread_pool.dump (); break; } } @@ -183,30 +232,36 @@ main (int argc, char *argv[]) { int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : n_iterations; + manual = argc > 3 ? 1 : 0; - ACE_DEBUG ((LM_DEBUG, "(%t) argc = %d, threads = %d\n", - argc, n_threads)); - - // Create the worker tasks. - Thread_Pool thread_pool (ACE_Thread_Manager::instance (), - n_threads); - - // Create work for the worker tasks to process in their own threads. - produce (thread_pool); - - // Wait for all the threads to reach their exit point. - - ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n")); - ACE_Thread_Manager::instance ()->wait (); - - ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) argc = %d, threads = %d\n", + argc, + n_threads)); + + { + // Create the worker tasks. + Thread_Pool thread_pool (ACE_Thread_Manager::instance (), + n_threads); + + // Create work for the worker tasks to process in their own threads. + producer (thread_pool); + + // Wait for all the threads to reach their exit point. + ACE_DEBUG ((LM_DEBUG, + "(%t) waiting for threads to exit in Thread_Pool destructor...\n")); + } + + ACE_DEBUG ((LM_DEBUG, + "(%t) destroying worker tasks and exiting...\n")); return 0; } #else int main (int, char *[]) { - ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + ACE_ERROR ((LM_ERROR, + "threads not supported on this platform\n")); return 0; } #endif /* ACE_HAS_THREADS */ |