summaryrefslogtreecommitdiff
path: root/ACE/examples/APG/ThreadPools/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/APG/ThreadPools/ThreadPool.cpp')
-rw-r--r--ACE/examples/APG/ThreadPools/ThreadPool.cpp271
1 files changed, 271 insertions, 0 deletions
diff --git a/ACE/examples/APG/ThreadPools/ThreadPool.cpp b/ACE/examples/APG/ThreadPools/ThreadPool.cpp
new file mode 100644
index 00000000000..684762efcbf
--- /dev/null
+++ b/ACE/examples/APG/ThreadPools/ThreadPool.cpp
@@ -0,0 +1,271 @@
+// $Id$
+
+#include "ace/config-lite.h"
+#if defined (ACE_HAS_THREADS)
+
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_time.h"
+#include "ace/Task.h"
+#include "ace/Containers.h"
+#include "ace/Synch.h"
+#include "ace/SString.h"
+#include "ace/Method_Request.h"
+#include "ace/Future.h"
+#include "ace/Activation_Queue.h"
+
+class Worker;
+
+class IManager
+{
+public:
+ virtual ~IManager (void) { }
+
+ virtual int return_to_work (Worker *worker) = 0;
+};
+
+// Listing 2 code/ch16
+class Worker : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Worker (IManager *manager) : manager_(manager) { }
+
+ virtual int svc (void)
+ {
+ thread_id_ = ACE_Thread::self ();
+ while (1)
+ {
+ ACE_Message_Block *mb = 0;
+ if (this->getq (mb) == -1)
+ ACE_ERROR_BREAK
+ ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));
+ if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
+ {
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("(%t) Shutting down\n")));
+ mb->release ();
+ break;
+ }
+ // Process the message.
+ process_message (mb);
+ // Return to work.
+ this->manager_->return_to_work (this);
+ }
+
+ return 0;
+ }
+ // Listing 2
+
+ ACE_thread_t thread_id (void)
+ {
+ return thread_id_;
+ }
+
+private:
+ void process_message (ACE_Message_Block *mb)
+ {
+ ACE_TRACE (ACE_TEXT ("Worker::process_message"));
+ int msgId;
+ ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
+ mb->release ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Started processing message %d\n"),
+ msgId));
+ ACE_OS::sleep (3);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Finished processing message %d\n"),
+ msgId));
+ }
+
+ IManager *manager_;
+ ACE_thread_t thread_id_;
+};
+
+// Listing 1 code/ch16
+class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
+{
+public:
+ enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
+
+ Manager ()
+ : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
+ {
+ ACE_TRACE (ACE_TEXT ("Manager::Manager"));
+ }
+
+ int svc (void)
+ {
+ ACE_TRACE (ACE_TEXT ("Manager::svc"));
+
+ ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
+
+ // Create pool.
+ create_worker_pool ();
+
+ while (!done ())
+ {
+ ACE_Message_Block *mb = 0;
+ ACE_Time_Value tv ((long)MAX_TIMEOUT);
+ tv += ACE_OS::time (0);
+
+ // Get a message request.
+ if (this->getq (mb, &tv) < 0)
+ {
+ shut_down ();
+ break;
+ }
+
+ // Choose a worker.
+ Worker *worker = 0;
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex,
+ worker_mon, this->workers_lock_, -1);
+
+ while (this->workers_.is_empty ())
+ workers_cond_.wait ();
+
+ this->workers_.dequeue_head (worker);
+ }
+
+ // Ask the worker to do the job.
+ worker->putq (mb);
+ }
+
+ return 0;
+ }
+
+ int shut_down (void);
+
+ ACE_thread_t thread_id (Worker *worker);
+
+ virtual int return_to_work (Worker *worker)
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex,
+ worker_mon, this->workers_lock_, -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Worker %d returning to work.\n"),
+ worker->thr_mgr ()->thr_self ()));
+ this->workers_.enqueue_tail (worker);
+ this->workers_cond_.signal ();
+
+ return 0;
+ }
+
+private:
+ int create_worker_pool (void)
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex,
+ worker_mon,
+ this->workers_lock_,
+ -1);
+ for (int i = 0; i < POOL_SIZE; i++)
+ {
+ Worker *worker;
+ ACE_NEW_RETURN (worker, Worker (this), -1);
+ this->workers_.enqueue_tail (worker);
+ worker->activate ();
+ }
+
+ return 0;
+ }
+
+ int done (void);
+
+private:
+ int shutdown_;
+ ACE_Thread_Mutex workers_lock_;
+ ACE_Condition<ACE_Thread_Mutex> workers_cond_;
+ ACE_Unbounded_Queue<Worker* > workers_;
+};
+// Listing 1
+
+int Manager::done (void)
+{
+ return (shutdown_ == 1);
+}
+
+int
+Manager::shut_down (void)
+{
+ ACE_TRACE (ACE_TEXT ("Manager::shut_down"));
+ ACE_Unbounded_Queue<Worker* >::ITERATOR iter =
+ this->workers_.begin ();
+ Worker **worker_ptr = 0;
+ do
+ {
+ iter.next (worker_ptr);
+ Worker *worker = (*worker_ptr);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Attempting shutdown of %d\n"),
+ thread_id (worker)));
+
+ // Send the hangup message.
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN
+ (mb,
+ ACE_Message_Block(0,
+ ACE_Message_Block::MB_HANGUP),
+ -1);
+ worker->putq (mb);
+
+ // Wait for the exit.
+ worker->wait ();
+
+ ACE_ASSERT (worker->msg_queue ()->is_empty ());
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Worker %d shut down.\n)"),
+ thread_id (worker)));
+ delete worker;
+ }
+ while (iter.advance ());
+
+ shutdown_ = 1;
+
+ return 0;
+}
+
+ACE_thread_t
+Manager::thread_id (Worker *worker)
+{
+ return worker->thread_id ();
+}
+
+
+int ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ Manager tp;
+ tp.activate ();
+
+ // Wait for a moment every time you send a message.
+ ACE_Time_Value tv;
+ tv.msec (100);
+
+ ACE_Message_Block *mb;
+ for (int i = 0; i < 30; i++)
+ {
+ ACE_NEW_RETURN
+ (mb, ACE_Message_Block(sizeof(int)), -1);
+
+ ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
+
+ ACE_OS::sleep (tv);
+
+ // Add a new work item.
+ tp.putq (mb);
+ }
+
+ ACE_Thread_Manager::instance ()->wait ();
+ return 0;
+}
+
+#else
+#include "ace/OS_main.h"
+#include "ace/OS_NS_stdio.h"
+
+int ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_OS::puts (ACE_TEXT ("This example requires threads."));
+ return 0;
+}
+
+#endif /* ACE_HAS_THREADS */