diff options
Diffstat (limited to 'examples/APG/ThreadPools/ThreadPool.cpp')
-rw-r--r-- | examples/APG/ThreadPools/ThreadPool.cpp | 283 |
1 files changed, 0 insertions, 283 deletions
diff --git a/examples/APG/ThreadPools/ThreadPool.cpp b/examples/APG/ThreadPools/ThreadPool.cpp deleted file mode 100644 index 92b8146a9e5..00000000000 --- a/examples/APG/ThreadPools/ThreadPool.cpp +++ /dev/null @@ -1,283 +0,0 @@ -// $Id$ - -#include "ace/config-lite.h" -#if defined (ACE_HAS_THREADS) - -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_time.h" -#include "ace/Task.h" -#include "ace/Containers.h" -#include "ace/Synch.h" -#include "ace/SString.h" -#include "ace/Method_Request.h" -#include "ace/Future.h" -#include "ace/Activation_Queue.h" - -class Worker; - -class IManager -{ -public: - virtual ~IManager (void) { } - - virtual int return_to_work (Worker *worker) = 0; -}; - -// Listing 2 code/ch16 -class Worker : public ACE_Task<ACE_MT_SYNCH> -{ -public: - Worker (IManager *manager) : manager_(manager) { } - - virtual int svc (void) - { - thread_id_ = ACE_Thread::self (); - while (1) - { - ACE_Message_Block *mb = 0; - if (this->getq (mb) == -1) - ACE_ERROR_BREAK - ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq"))); - if (mb->msg_type () == ACE_Message_Block::MB_HANGUP) - { - ACE_DEBUG ((LM_INFO, - ACE_TEXT ("(%t) Shutting down\n"))); - mb->release (); - break; - } - // Process the message. - process_message (mb); - // Return to work. - this->manager_->return_to_work (this); - } - - return 0; - } - // Listing 2 - - ACE_thread_t thread_id (void) - { - return thread_id_; - } - -private: - void process_message (ACE_Message_Block *mb) - { - ACE_TRACE (ACE_TEXT ("Worker::process_message")); - int msgId; - ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); - mb->release (); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Started processing message %d\n"), - msgId)); - ACE_OS::sleep (3); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Finished processing message %d\n"), - msgId)); - } - - IManager *manager_; - ACE_thread_t thread_id_; -}; - -// Listing 1 code/ch16 -class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager -{ -public: - enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; - - Manager () - : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_) - { - ACE_TRACE (ACE_TEXT ("Manager::Manager")); - } - - int svc (void) - { - ACE_TRACE (ACE_TEXT ("Manager::svc")); - - ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); - - // Create pool. - create_worker_pool (); - - while (!done ()) - { - ACE_Message_Block *mb = 0; - ACE_Time_Value tv ((long)MAX_TIMEOUT); - tv += ACE_OS::time (0); - - // Get a message request. - if (this->getq (mb, &tv) < 0) - { - shut_down (); - break; - } - - // Choose a worker. - Worker *worker = 0; - { - ACE_GUARD_RETURN (ACE_Thread_Mutex, - worker_mon, this->workers_lock_, -1); - - while (this->workers_.is_empty ()) - workers_cond_.wait (); - - this->workers_.dequeue_head (worker); - } - - // Ask the worker to do the job. - worker->putq (mb); - } - - return 0; - } - - int shut_down (void); - - ACE_thread_t thread_id (Worker *worker); - - virtual int return_to_work (Worker *worker) - { - ACE_GUARD_RETURN (ACE_Thread_Mutex, - worker_mon, this->workers_lock_, -1); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Worker %d returning to work.\n"), - worker->thr_mgr ()->thr_self ())); - this->workers_.enqueue_tail (worker); - this->workers_cond_.signal (); - - return 0; - } - -private: - int create_worker_pool (void) - { - ACE_GUARD_RETURN (ACE_Thread_Mutex, - worker_mon, - this->workers_lock_, - -1); - for (int i = 0; i < POOL_SIZE; i++) - { - Worker *worker; - ACE_NEW_RETURN (worker, Worker (this), -1); - this->workers_.enqueue_tail (worker); - worker->activate (); - } - - return 0; - } - - int done (void); - -private: - int shutdown_; - ACE_Thread_Mutex workers_lock_; - ACE_Condition<ACE_Thread_Mutex> workers_cond_; - ACE_Unbounded_Queue<Worker* > workers_; -}; -// Listing 1 - -int Manager::done (void) -{ - return (shutdown_ == 1); -} - -int -Manager::shut_down (void) -{ - ACE_TRACE (ACE_TEXT ("Manager::shut_down")); - ACE_Unbounded_Queue<Worker* >::ITERATOR iter = - this->workers_.begin (); - Worker **worker_ptr = 0; - do - { - iter.next (worker_ptr); - Worker *worker = (*worker_ptr); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Attempting shutdown of %d\n"), - thread_id (worker))); - - // Send the hangup message. - ACE_Message_Block *mb; - ACE_NEW_RETURN - (mb, - ACE_Message_Block(0, - ACE_Message_Block::MB_HANGUP), - -1); - worker->putq (mb); - - // Wait for the exit. - worker->wait (); - - ACE_ASSERT (worker->msg_queue ()->is_empty ()); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Worker %d shut down.\n)"), - thread_id (worker))); - delete worker; - } - while (iter.advance ()); - - shutdown_ = 1; - - return 0; -} - -ACE_thread_t -Manager::thread_id (Worker *worker) -{ - return worker->thread_id (); -} - - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - Manager tp; - tp.activate (); - - // Wait for a moment every time you send a message. - ACE_Time_Value tv; - tv.msec (100); - - ACE_Message_Block *mb; - for (int i = 0; i < 30; i++) - { - ACE_NEW_RETURN - (mb, ACE_Message_Block(sizeof(int)), -1); - - ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); - - ACE_OS::sleep (tv); - - // Add a new work item. - tp.putq (mb); - } - - ACE_Thread_Manager::instance ()->wait (); - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Condition<ACE_Thread_Mutex>; -template class ACE_Node<Worker*>; -template class ACE_Unbounded_Queue<Worker*>; -template class ACE_Unbounded_Queue_Iterator<Worker*>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Condition<ACE_Thread_Mutex> -#pragma instantiate ACE_Node<Worker*> -#pragma instantiate ACE_Unbounded_Queue<Worker*> -#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - -#else -#include "ace/OS_main.h" -#include "ace/OS_NS_stdio.h" - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_OS::puts (ACE_TEXT ("This example requires threads.")); - return 0; -} - -#endif /* ACE_HAS_THREADS */ |