summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>2000-01-08 22:08:47 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>2000-01-08 22:08:47 +0000
commit9689e9d7f495f4a4a00dd2536af628dd39bb96ee (patch)
tree075c88aa547e84b88698421e8d3d584ce2e2156d
parent4e20bf8f28a77b5707a74d9ec53949996b61ee3d (diff)
downloadATCD-9689e9d7f495f4a4a00dd2536af628dd39bb96ee.tar.gz
ChangeLogTag:Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-rw-r--r--ChangeLog12
-rw-r--r--ChangeLogs/ChangeLog-02a12
-rw-r--r--ChangeLogs/ChangeLog-03a12
-rw-r--r--ace/Proactor.cpp2
-rw-r--r--ace/Task.cpp1
-rw-r--r--ace/Task_T.cpp13
-rw-r--r--ace/Task_T.h11
-rw-r--r--ace/Thread_Manager.cpp23
-rw-r--r--ace/Thread_Manager.h3
-rw-r--r--examples/Threads/task_four.cpp122
-rw-r--r--examples/Threads/task_three.cpp3
-rw-r--r--examples/Threads/thread_pool.cpp189
12 files changed, 284 insertions, 119 deletions
diff --git a/ChangeLog b/ChangeLog
index 7802d35ecda..f9534e10793 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,17 @@
Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+ * examples/Threads/thread_pool.cpp: Revised the example to
+ illustrate the new "wait_for_threads_in_destructor" feature of
+ ACE_Task.
+
+ * examples/Threads/task_four.cpp: Reformatted the code.
+
+ * ace/Task_T: Added a new flag to the constructor that enables
+ applications to request that an ACE_Task will wait in its
+ destructor for any and all threads in the task to exit before
+ returning. Thanks to Valery Arkhangorodsky
+ <valerya@servicesoft.com> for suggesting this.
+
* ace/OS.i: Needed to add an extern "C" {} block around the
setregid() and setreuid() functions. Thanks to David Levine
for reporting this.
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 7802d35ecda..f9534e10793 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,5 +1,17 @@
Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+ * examples/Threads/thread_pool.cpp: Revised the example to
+ illustrate the new "wait_for_threads_in_destructor" feature of
+ ACE_Task.
+
+ * examples/Threads/task_four.cpp: Reformatted the code.
+
+ * ace/Task_T: Added a new flag to the constructor that enables
+ applications to request that an ACE_Task will wait in its
+ destructor for any and all threads in the task to exit before
+ returning. Thanks to Valery Arkhangorodsky
+ <valerya@servicesoft.com> for suggesting this.
+
* ace/OS.i: Needed to add an extern "C" {} block around the
setregid() and setreuid() functions. Thanks to David Levine
for reporting this.
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 7802d35ecda..f9534e10793 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,5 +1,17 @@
Sat Jan 8 09:44:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+ * examples/Threads/thread_pool.cpp: Revised the example to
+ illustrate the new "wait_for_threads_in_destructor" feature of
+ ACE_Task.
+
+ * examples/Threads/task_four.cpp: Reformatted the code.
+
+ * ace/Task_T: Added a new flag to the constructor that enables
+ applications to request that an ACE_Task will wait in its
+ destructor for any and all threads in the task to exit before
+ returning. Thanks to Valery Arkhangorodsky
+ <valerya@servicesoft.com> for suggesting this.
+
* ace/OS.i: Needed to add an extern "C" {} block around the
setregid() and setreuid() functions. Thanks to David Levine
for reporting this.
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 6089b432e8f..477762498e0 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -275,7 +275,7 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation,
ACE_NEW (this->timer_handler_,
ACE_Proactor_Timer_Handler (*this));
- // Activate <timer_handler>
+ // Activate <timer_handler>.
if (this->timer_handler_->activate (THR_NEW_LWP | THR_DETACHED) == -1)
ACE_ERROR ((LM_ERROR,
ASYS_TEXT ("%N:%l:(%P | %t):%p\n"),
diff --git a/ace/Task.cpp b/ace/Task.cpp
index 20f4d7d5595..91a7588d65b 100644
--- a/ace/Task.cpp
+++ b/ace/Task.cpp
@@ -26,6 +26,7 @@ ACE_Task_Base::ACE_Task_Base (ACE_Thread_Manager *thr_man)
}
// Wait for all threads running in a task to exit.
+
int
ACE_Task_Base::wait (void)
{
diff --git a/ace/Task_T.cpp b/ace/Task_T.cpp
index 17c4612e67e..88a9354396b 100644
--- a/ace/Task_T.cpp
+++ b/ace/Task_T.cpp
@@ -45,12 +45,14 @@ ACE_Task<ACE_SYNCH_USE>::dump (void) const
template<ACE_SYNCH_DECL>
ACE_Task<ACE_SYNCH_USE>::ACE_Task (ACE_Thread_Manager *thr_man,
- ACE_Message_Queue<ACE_SYNCH_USE> *mq)
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq,
+ int wait_for_threads_in_destructor)
: ACE_Task_Base (thr_man),
msg_queue_ (0),
delete_msg_queue_ (0),
mod_ (0),
- next_ (0)
+ next_ (0),
+ wait_for_threads_in_destructor_ (wait_for_threads_in_destructor)
{
ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::ACE_Task");
@@ -68,6 +70,13 @@ template<ACE_SYNCH_DECL>
ACE_Task<ACE_SYNCH_USE>::~ACE_Task (void)
{
ACE_TRACE ("ACE_Task<ACE_SYNCH_USE>::~ACE_Task");
+ // Wait for any and all threads in this task to exit.
+ if (this->wait_for_threads_in_destructor_ != 0
+ && this->wait () == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT ("(%t) %p\n"),
+ ASYS_TEXT ("wait() failed in ~ACE_Task_Base")));
+
if (this->delete_msg_queue_)
delete this->msg_queue_;
diff --git a/ace/Task_T.h b/ace/Task_T.h
index 0fb31c4aa78..f0950e91a0e 100644
--- a/ace/Task_T.h
+++ b/ace/Task_T.h
@@ -45,11 +45,14 @@ public:
// = Initialization/termination methods.
ACE_Task (ACE_Thread_Manager *thr_mgr = 0,
- ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0);
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0,
+ int wait_for_threads_in_destructor = 0);
// Initialize a Task, supplying a thread manager and a message
// queue. If the user doesn't supply a ACE_Message_Queue pointer
// then we'll allocate one dynamically. Otherwise, we'll use the
- // one they give.
+ // one passed as a parameter. If <wait_for_threads_in_destructor>
+ // is non-0 then <~ACE_Task> will wait for any threads in this task
+ // to exit before returning.
virtual ~ACE_Task (void);
// Destructor.
@@ -137,6 +140,10 @@ public: // Should be protected:
ACE_Task<ACE_SYNCH_USE> *next_;
// Pointer to adjacent ACE_Task.
+ int wait_for_threads_in_destructor_;
+ // If this is non-0 then wait for threads in this task to exit
+ // before returning from the destructor.
+
void dump (void) const;
// Dump the state of an object.
diff --git a/ace/Thread_Manager.cpp b/ace/Thread_Manager.cpp
index 7447f3bdf8b..bcce9716399 100644
--- a/ace/Thread_Manager.cpp
+++ b/ace/Thread_Manager.cpp
@@ -1902,7 +1902,7 @@ ACE_Thread_Manager::apply_task (ACE_Task_Base *task,
return result;
}
-// Wait for task
+// Wait for all threads to exit a task.
int
ACE_Thread_Manager::wait_task (ACE_Task_Base *task)
@@ -1911,7 +1911,7 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task)
ACE_Thread_Descriptor_Base *copy_table = 0;
// We have to make sure that while we wait for these threads to
- // exit, we do not have the lock. Therefore we make a copy of all
+ // exit, we do not have the lock. Therefore we make a copy of all
// interesting entries and let go of the lock.
{
ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
@@ -1930,12 +1930,16 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task)
for (ACE_Double_Linked_List_Iterator<ACE_Thread_Descriptor> iter (this->thr_list_);
!iter.done ();
iter.advance ())
- // If threads are created as THR_DETACHED or THR_DAEMON, we can't help much here.
+ // If threads are created as THR_DETACHED or THR_DAEMON, we
+ // can't wait on them here.
if (iter.next ()->task_ == task &&
- (ACE_BIT_DISABLED (iter.next ()->flags_, THR_DETACHED | THR_DAEMON)
- || ACE_BIT_ENABLED (iter.next ()->flags_, THR_JOINABLE)))
+ (ACE_BIT_DISABLED (iter.next ()->flags_,
+ THR_DETACHED | THR_DAEMON)
+ || ACE_BIT_ENABLED (iter.next ()->flags_,
+ THR_JOINABLE)))
{
- ACE_SET_BITS (iter.next ()->thr_state_, ACE_THR_JOINING);
+ ACE_SET_BITS (iter.next ()->thr_state_,
+ ACE_THR_JOINING);
copy_table[copy_count++] = *iter.next ();
}
@@ -1946,7 +1950,8 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task)
// If threads are created as THR_DETACHED or THR_DAEMON, we can't help much here.
if (titer.next ()->task_ == task)
{
- ACE_Thread_Descriptor_Base *tdb = titer.advance_and_remove (0);
+ ACE_Thread_Descriptor_Base *tdb =
+ titer.advance_and_remove (0);
copy_table[copy_count++] = *tdb;
delete tdb;
}
@@ -1956,7 +1961,9 @@ ACE_Thread_Manager::wait_task (ACE_Task_Base *task)
// Now to do the actual work
int result = 0;
- for (int i = 0; i < copy_count && result != -1; i++)
+ for (int i = 0;
+ i < copy_count && result != -1;
+ i++)
{
if (ACE_Thread::join (copy_table[i].thr_handle_) == -1)
result = -1;
diff --git a/ace/Thread_Manager.h b/ace/Thread_Manager.h
index 08c85fabaac..d35afbedef4 100644
--- a/ace/Thread_Manager.h
+++ b/ace/Thread_Manager.h
@@ -603,9 +603,10 @@ public:
// resembles <suspend_thr>.
// = Operations on ACE_Tasks.
+
int wait_task (ACE_Task_Base *task);
// Block until there are no more threads running in <task>. Returns
- // 0 on success and -1 on failure. Notice that wait_task will not
+ // 0 on success and -1 on failure. Note that <wait_task> will not
// wait on detached threads.
int suspend_task (ACE_Task_Base *task);
// Suspend all threads in an ACE_Task.
diff --git a/examples/Threads/task_four.cpp b/examples/Threads/task_four.cpp
index 177a9950b7b..42ad56d9c06 100644
--- a/examples/Threads/task_four.cpp
+++ b/examples/Threads/task_four.cpp
@@ -1,7 +1,7 @@
// $Id$
// The following test was written by Hamutal Yanay & Ari Erev's
-// (Ari_Erev@comverse.com).
+// (Ari_Erev@comverse.com).
//
// This test program test enhancements to the thread_manager and task
// classes. The purpose of these enhancements was to allow the
@@ -85,7 +85,9 @@ size_t Worker_Task::workers_count_ = 1;
int
Worker_Task::close (u_long)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) closing task %d\n", this->index_));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing task %d\n",
+ this->index_));
delete this;
return 0;
}
@@ -103,24 +105,35 @@ Worker_Task::Worker_Task (ACE_Thread_Manager *thr_mgr,
int
Worker_Task::open (void *)
{
- // Create worker threads.
- return this->activate (THR_NEW_LWP, n_threads_, 0, -1, -1, this);
+ // Create the pool of worker threads.
+ return this->activate (THR_NEW_LWP,
+ n_threads_,
+ 0,
+ -1,
+ -1,
+ this);
}
int
Worker_Task::svc (void)
{
- ACE_DEBUG ((LM_DEBUG, " (%t) in worker %d\n", index_));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) in worker %d\n",
+ index_));
for (size_t iterations = 1;
iterations <= this->n_iterations_;
iterations++)
{
- ACE_DEBUG ((LM_DEBUG, " (%t) in iteration %d\n", iterations));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) in iteration %d\n",
+ iterations));
ACE_OS::sleep (0);
}
- ACE_DEBUG ((LM_DEBUG, " (%t) worker %d ends\n", index_));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) worker %d ends\n",
+ index_));
return 0;
}
@@ -134,9 +147,16 @@ Invoker_Task::Invoker_Task (ACE_Thread_Manager *thr_mgr,
n_threads_ (n_threads),
n_iterations_ (n_iterations)
{
- // Create worker threads.
- if (this->activate (THR_NEW_LWP, 1, 0, -1, -1, this) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
+ // Create a single worker thread.
+ if (this->activate (THR_NEW_LWP,
+ 1,
+ 0,
+ -1,
+ -1,
+ this) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "activate failed"));
}
// Iterate <n_iterations> time printing off a message and "waiting"
@@ -145,64 +165,84 @@ Invoker_Task::Invoker_Task (ACE_Thread_Manager *thr_mgr,
int
Invoker_Task::svc (void)
{
- // Note that the ACE_Task::svc_run () method automatically adds us to
- // the Thread_Manager when the thread begins.
+ // Note that the ACE_Task::svc_run () method automatically adds us
+ // to the Thread_Manager when the thread begins.
- ACE_Thread_Manager *thr_mgr = ACE_Thread_Manager::instance ();
+ ACE_Thread_Manager *thr_mgr =
+ ACE_Thread_Manager::instance ();
Worker_Task **worker_task;
- ACE_NEW_RETURN (worker_task, Worker_Task *[n_tasks_], -1);
-
+ ACE_NEW_RETURN (worker_task,
+ Worker_Task *[n_tasks_],
+ -1);
size_t task;
for (task = 0;
task < this->n_tasks_;
task++)
{
- ACE_DEBUG ((LM_DEBUG, " (%t) in task %d\n", task+1));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) in task %d\n",
+ task + 1));
ACE_NEW_RETURN (worker_task[task],
- Worker_Task (thr_mgr, n_threads_, n_iterations_),
+ Worker_Task (thr_mgr,
+ n_threads_,
+ n_iterations_),
-1);
if (worker_task[task]->open () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open failed"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "open failed"),
+ -1);
}
// Set all tasks to be one group
- ACE_DEBUG ((LM_DEBUG, " (%t) setting tasks group id\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) setting tasks group id\n"));
for (task = 0;
task < this->n_tasks_;
task++)
- if (thr_mgr->set_grp (worker_task[task], 1) == -1)
- ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "set_grp"));
-
- size_t n_tasks = thr_mgr->num_tasks_in_group (1);
- ACE_DEBUG ((LM_DEBUG, "Number of tasks in group 1: %d\n", n_tasks)) ;
+ if (thr_mgr->set_grp (worker_task[task],
+ 1) == -1)
+ ACE_ERROR ((LM_DEBUG,
+ " (%t) %p\n",
+ "set_grp"));
+
+ size_t n_tasks =
+ thr_mgr->num_tasks_in_group (1);
+ ACE_DEBUG ((LM_DEBUG,
+ "Number of tasks in group 1: %d\n",
+ n_tasks)) ;
// Wait for 1 second and then suspend every thread in the group.
ACE_OS::sleep (1);
- ACE_DEBUG ((LM_DEBUG, " (%t) suspending group\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) suspending group\n"));
if (thr_mgr->suspend_grp (1) == -1)
- ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_grp"));
+ ACE_ERROR ((LM_DEBUG,
+ " (%t) %p\n",
+ "suspend_grp"));
- // Wait for 3 more second and then resume every thread in the
- // group.
+ // Wait for 3 more second and then resume every thread in the group.
ACE_OS::sleep (ACE_Time_Value (2));
- ACE_DEBUG ((LM_DEBUG, " (%t) resuming group\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) resuming group\n"));
if (thr_mgr->resume_grp (1) == -1)
- ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_grp"));
+ ACE_ERROR ((LM_DEBUG,
+ " (%t) %p\n",
+ "resume_grp"));
// Wait for all the tasks to reach their exit point.
thr_mgr->wait ();
// Note that the ACE_Task::svc_run () method automatically removes
// us from the Thread_Manager when the thread exits.
-
return 0;
}
@@ -228,30 +268,38 @@ main (int argc, char *argv[])
// Wait for 1 second and then suspend the invoker task
ACE_OS::sleep (1);
- ACE_DEBUG ((LM_DEBUG, " (%t) suspending invoker task\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) suspending invoker task\n"));
if (invoker_manager.suspend_task (&invoker) == -1)
- ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_task"));
+ ACE_ERROR ((LM_DEBUG,
+ " (%t) %p\n",
+ "suspend_task"));
// Wait for 3 more second and then resume the invoker task.
ACE_OS::sleep (ACE_Time_Value (3));
- ACE_DEBUG ((LM_DEBUG, " (%t) resuming invoker task\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) resuming invoker task\n"));
if (invoker_manager.resume_task (&invoker) == -1)
- ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_task"));
+ ACE_ERROR ((LM_DEBUG,
+ " (%t) %p\n",
+ "resume_task"));
// Wait for all the threads to reach their exit point.
invoker_manager.wait ();
- ACE_DEBUG ((LM_DEBUG, " (%t) done\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) done\n"));
return 0;
}
#else
int
main (int, char *[])
{
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ ACE_ERROR ((LM_ERROR,
+ "threads not supported on this platform\n"));
return 0;
}
#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/task_three.cpp b/examples/Threads/task_three.cpp
index b6305330195..0bda095f67c 100644
--- a/examples/Threads/task_three.cpp
+++ b/examples/Threads/task_three.cpp
@@ -195,7 +195,8 @@ main (int argc, char **)
Test_Task t1[TASK_COUNT];
Test_Task t2[TASK_COUNT];
- ACE_Thread::spawn (ACE_THR_FUNC (dispatch), reactor2);
+ ACE_Thread::spawn (ACE_THR_FUNC (dispatch),
+ reactor2);
reactor1->owner (ACE_OS::thr_self ());
diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp
index 3ec15e51b46..8bd09996ed8 100644
--- a/examples/Threads/thread_pool.cpp
+++ b/examples/Threads/thread_pool.cpp
@@ -2,13 +2,14 @@
// This test program illustrates how the ACE task synchronization
// mechanisms work in conjunction with the ACE_Task and the
-// ACE_Thread_Manager. If the manual flag is not set input comes from
-// stdin until the user enters a return only. This stops all workers
-// via a message block of length 0. This is an alternative shutdown of
-// workers compared to queue deactivate.
+// ACE_Thread_Manager. If the <manual> flag is set input comes from
+// stdin until the user enters a return -- otherwise, the input is
+// generated automatically. All worker threads shutdown when they
+// receive a message block of length 0.
//
// This code is original based on a test program written by Karlheinz
-// Dorn. It was modified to utilize more "ACE" features by Doug Schmidt.
+// Dorn <Karlheinz.Dorn@med.siemens.de>. It was modified to utilize
+// more ACE features by Doug Schmidt <schmidt@cs.wustl.edu>.
#include "ace/Task.h"
#include "ace/Service_Config.h"
@@ -19,45 +20,68 @@ ACE_RCSID(Threads, thread_pool, "$Id$")
#if defined (ACE_HAS_THREADS)
-// Number of iterations to run the test.
+// Default number of iterations to run the test.
static int n_iterations = 100;
+// Controls whether the input is generated "manually" or automatically.
+static int manual = 0;
+
class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
+ // = TITLE
+ // Defines a thread pool abstraction based on the <ACE_Task>.
public:
- Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads);
-
+ Thread_Pool (ACE_Thread_Manager *thr_mgr,
+ int n_threads);
+ // Constructor activates <n_threads> in the thread pool.
+
+ ~Thread_Pool (void);
+ // Destructor...
+
virtual int svc (void);
// Iterate <n_iterations> time printing off a message and "waiting"
// for all other threads to complete this iteration.
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0);
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv = 0);
// This allows the producer to pass messages to the <Thread_Pool>.
private:
virtual int close (u_long);
+ // Close hook.
};
int
Thread_Pool::close (u_long)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) close of worker\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) worker thread closing down\n"));
return 0;
}
Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr,
int n_threads)
- : ACE_Task<ACE_MT_SYNCH> (thr_mgr)
+ : ACE_Task<ACE_MT_SYNCH> (thr_mgr,
+ 0,
+ 1) // Wait in destructor for threads to exit...
+{
+ // Create the pool of worker threads.
+ if (this->activate (THR_NEW_LWP,
+ n_threads) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "activate failed"));
+}
+
+Thread_Pool::~Thread_Pool (void)
{
- // Create worker threads.
- if (this->activate (THR_NEW_LWP, n_threads) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
}
// Simply enqueue the Message_Block into the end of the queue.
int
-Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+Thread_Pool::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
{
return this->putq (mb, tv);
}
@@ -68,7 +92,7 @@ Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
int
Thread_Pool::svc (void)
{
- // Note that the ACE_Task::svc_run () method automatically adds us to
+ // Note that the <ACE_Task::svc_run> method automatically adds us to
// the Thread_Manager when the thread begins.
int count = 1;
@@ -80,13 +104,16 @@ Thread_Pool::svc (void)
{
ACE_Message_Block *mb;
- ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d before getq ()\n", count));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in iteration %d before getq ()\n",
+ count));
if (this->getq (mb) == -1)
{
ACE_ERROR ((LM_ERROR,
- "(%t) in iteration %d, got result -1, exiting\n", count));
- break;
+ "(%t) in iteration %d, got result -1, exiting\n",
+ count));
+ break;
}
int length = mb->length ();
@@ -94,7 +121,10 @@ Thread_Pool::svc (void)
if (length > 0)
ACE_DEBUG ((LM_DEBUG,
"(%t) in iteration %d, length = %d, text = \"%*s\"\n",
- count, length, length - 1, mb->rd_ptr ()));
+ count,
+ length,
+ length - 1,
+ mb->rd_ptr ()));
// We're responsible for deallocating this.
mb->release ();
@@ -108,41 +138,51 @@ Thread_Pool::svc (void)
}
}
- // Note that the ACE_Task::svc_run () method automatically removes
- // us from the Thread_Manager when the thread exits.
+ // Note that the <ACE_Task::svc_run> method automatically removes us
+ // from the <ACE_Thread_Manager> when the thread exits.
return 0;
}
-static void
-produce (Thread_Pool &thread_pool)
+static void
+producer (Thread_Pool &thread_pool)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n"));
- thread_pool.dump ();
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) producer start, generating data for the <Thread_Pool>\n"));
+ // thread_pool.dump ();
- for (int n;;)
+ for (int n; ;)
{
// Allocate a new message.
- ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
+ ACE_Message_Block *mb;
+ ACE_NEW (mb,
+ ACE_Message_Block (BUFSIZ));
-#if defined (manual)
- ACE_DEBUG ((LM_DEBUG,
- "(%t) press chars and enter to put a new message into task queue..."));
- n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ());
-#else // Automatically generate messages.
- static int count = 0;
+ if (manual)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) enter a new message for the task pool..."));
+ n = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
+ }
+ else
+ {
+ static int count = 0;
- ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count);
+ ACE_OS::sprintf (mb->rd_ptr (),
+ "%d\n",
+ count);
+ n = ACE_OS::strlen (mb->rd_ptr ());
- n = ACE_OS::strlen (mb->rd_ptr ());
+ if (count == n_iterations)
+ n = 1; // Indicate that we need to shut down.
+ else
+ count++;
- if (count == n_iterations)
- n = 1; // Indicate that we need to shut down.
- else
- count++;
+ if (count == 0 || (count % 20 == 0))
+ ACE_OS::sleep (1);
+ }
- if (count == 0 || (count % 20 == 0))
- ACE_OS::sleep (1);
-#endif /* manual */
if (n > 1)
{
// Send a normal message to the waiting threads and continue
@@ -151,13 +191,16 @@ produce (Thread_Pool &thread_pool)
// Pass the message to the Thread_Pool.
if (thread_pool.put (mb) == -1)
- ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
+ ACE_ERROR ((LM_ERROR,
+ " (%t) %p\n",
+ "put"));
}
else
{
// Send a shutdown message to the waiting threads and exit.
- ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n"));
- thread_pool.dump ();
+ ACE_DEBUG ((LM_DEBUG,
+ "\n(%t) start loop, dump of task:\n"));
+ // thread_pool.dump ();
for (int i = thread_pool.thr_count (); i > 0; i--)
{
@@ -167,12 +210,18 @@ produce (Thread_Pool &thread_pool)
// Enqueue a NULL message to flag each consumer to
// shutdown.
- if (thread_pool.put (new ACE_Message_Block) == -1)
- ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
+ ACE_Message_Block *mb;
+ ACE_NEW (mb,
+ ACE_Message_Block);
+ if (thread_pool.put (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ " (%t) %p\n",
+ "put"));
}
- ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n"));
- thread_pool.dump ();
+ ACE_DEBUG ((LM_DEBUG,
+ "\n(%t) end loop\n"));
+ // thread_pool.dump ();
break;
}
}
@@ -183,30 +232,36 @@ main (int argc, char *argv[])
{
int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : n_iterations;
+ manual = argc > 3 ? 1 : 0;
- ACE_DEBUG ((LM_DEBUG, "(%t) argc = %d, threads = %d\n",
- argc, n_threads));
-
- // Create the worker tasks.
- Thread_Pool thread_pool (ACE_Thread_Manager::instance (),
- n_threads);
-
- // Create work for the worker tasks to process in their own threads.
- produce (thread_pool);
-
- // Wait for all the threads to reach their exit point.
-
- ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n"));
- ACE_Thread_Manager::instance ()->wait ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) argc = %d, threads = %d\n",
+ argc,
+ n_threads));
+
+ {
+ // Create the worker tasks.
+ Thread_Pool thread_pool (ACE_Thread_Manager::instance (),
+ n_threads);
+
+ // Create work for the worker tasks to process in their own threads.
+ producer (thread_pool);
+
+ // Wait for all the threads to reach their exit point.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) waiting for threads to exit in Thread_Pool destructor...\n"));
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) destroying worker tasks and exiting...\n"));
return 0;
}
#else
int
main (int, char *[])
{
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ ACE_ERROR ((LM_ERROR,
+ "threads not supported on this platform\n"));
return 0;
}
#endif /* ACE_HAS_THREADS */