diff options
author | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-07-31 16:14:11 +0000 |
---|---|---|
committer | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-07-31 16:14:11 +0000 |
commit | 9c2ec3049a5634f1c916d792e81fc80edb669ea8 (patch) | |
tree | 0d634e179761581cadbffe852e7f9a9f049b5c6e /examples/APG/ThreadPools | |
parent | 6540653cf736840d5aad719c73a8e43a549080be (diff) | |
download | ATCD-9c2ec3049a5634f1c916d792e81fc80edb669ea8.tar.gz |
This commit was manufactured by cvs2svn to create tag 'TAO-1_4_2'.TAO-1_4_2
Diffstat (limited to 'examples/APG/ThreadPools')
-rw-r--r-- | examples/APG/ThreadPools/Futures.cpp | 343 | ||||
-rw-r--r-- | examples/APG/ThreadPools/LF_ThreadPool.cpp | 264 | ||||
-rw-r--r-- | examples/APG/ThreadPools/Request_Handler.h | 31 | ||||
-rw-r--r-- | examples/APG/ThreadPools/TP_Reactor.cpp | 287 | ||||
-rw-r--r-- | examples/APG/ThreadPools/Task_ThreadPool.cpp | 149 | ||||
-rw-r--r-- | examples/APG/ThreadPools/ThreadPool.cpp | 281 | ||||
-rw-r--r-- | examples/APG/ThreadPools/threadpools.mpc | 37 | ||||
-rw-r--r-- | examples/APG/ThreadPools/threadpools.mwc | 6 |
8 files changed, 0 insertions, 1398 deletions
diff --git a/examples/APG/ThreadPools/Futures.cpp b/examples/APG/ThreadPools/Futures.cpp deleted file mode 100644 index 95f76a524cf..00000000000 --- a/examples/APG/ThreadPools/Futures.cpp +++ /dev/null @@ -1,343 +0,0 @@ -// $Id$ - -#include "ace/config-lite.h" -#if defined (ACE_HAS_THREADS) - -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_time.h" -#include "ace/Task.h" -#include "ace/Unbounded_Queue.h" -#include "ace/Synch.h" -#include "ace/SString.h" -#include "ace/Method_Request.h" -#include "ace/Future.h" -#include "ace/Activation_Queue.h" - -#define OUTSTANDING_REQUESTS 20 - -// Listing 2 code/ch16 -class CompletionCallBack: public ACE_Future_Observer<ACE_CString*> -{ -public: - virtual void update (const ACE_Future<ACE_CString*> & future) - { - ACE_CString *result; - - // Block for the result. - ((ACE_Future<ACE_CString*>)future).get (result); - ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ())); - delete result; - } -}; -// Listing 2 -// Listing 1 code/ch16 -class LongWork : public ACE_Method_Request -{ -public: - virtual int call (void) - { - ACE_TRACE (ACE_TEXT ("LongWork::call")); - ACE_DEBUG - ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n"))); - ACE_OS::sleep (1); - - char buf[1024]; - ACE_OS::strcpy (buf, ACE_TEXT ("Completed assigned task\n")); - ACE_CString *msg; - ACE_NEW_RETURN - (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1); - result_.set (msg); - return 0; - } - - ACE_Future<ACE_CString*> &future (void) - { - ACE_TRACE (ACE_TEXT ("LongWork::future")); - return result_; - } - - void attach (CompletionCallBack *cb) - { - result_.attach (cb); - } - -private: - ACE_Future<ACE_CString*> result_; -}; -// Listing 1 - -class Exit : public ACE_Method_Request -{ -public: - virtual int call (void) - { - ACE_TRACE (ACE_TEXT ("Exit::call")); - return -1; - } -}; - -class Worker; - -class IManager -{ -public: - virtual int return_to_work (Worker *worker) = 0; -}; - -// Listing 3 code/ch16 -class Worker: public ACE_Task<ACE_MT_SYNCH> -{ -public: - Worker (IManager *manager) - : manager_(manager), queue_ (msg_queue ()) - { } - - int perform (ACE_Method_Request *req) - { - ACE_TRACE (ACE_TEXT ("Worker::perform")); - return this->queue_.enqueue (req); - } - - virtual int svc (void) - { - thread_id_ = ACE_Thread::self (); - while (1) - { - ACE_Method_Request *request = this->queue_.dequeue(); - if (request == 0) - return -1; - - // Invoke the request - int result = request->call (); - if (result == -1) - break; - - // Return to work. - this->manager_->return_to_work (this); - } - - return 0; - } - - ACE_thread_t thread_id (void); - -private: - IManager *manager_; - ACE_thread_t thread_id_; - ACE_Activation_Queue queue_; -}; -// Listing 3 - -ACE_thread_t Worker::thread_id (void) -{ - return thread_id_; -} - -// Listing 4 code/ch16 -class Manager : public ACE_Task_Base, private IManager -{ -public: - enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; - - Manager () - : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_) - { - ACE_TRACE (ACE_TEXT ("Manager::TP")); - } - - int perform (ACE_Method_Request *req) - { - ACE_TRACE (ACE_TEXT ("Manager::perform")); - return this->queue_.enqueue (req); - } - - int svc (void) - { - ACE_TRACE (ACE_TEXT ("Manager::svc")); - - ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); - - // Create pool when you get in the first time. - create_worker_pool (); - - while (!done ()) - { - ACE_Time_Value tv ((long)MAX_TIMEOUT); - tv += ACE_OS::time (0); - - // Get the next message - ACE_Method_Request *request = this->queue_.dequeue (&tv); - if (request == 0) - { - shut_down (); - break; - } - - // Choose a worker. - Worker *worker = choose_worker (); - - // Ask the worker to do the job. - worker->perform (request); - } - - return 0; - } - - int shut_down (void); - - virtual int return_to_work (Worker *worker) - { - ACE_GUARD_RETURN - (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); - ACE_DEBUG - ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n"))); - this->workers_.enqueue_tail (worker); - this->workers_cond_.signal (); - - return 0; - } - -private: - Worker *choose_worker (void) - { - ACE_GUARD_RETURN - (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0) - - while (this->workers_.is_empty ()) - workers_cond_.wait (); - - Worker *worker; - this->workers_.dequeue_head (worker); - return worker; - } - - int create_worker_pool (void) - { - ACE_GUARD_RETURN - (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); - for (int i = 0; i < POOL_SIZE; i++) - { - Worker *worker; - ACE_NEW_RETURN (worker, Worker (this), -1); - this->workers_.enqueue_tail (worker); - worker->activate (); - } - - return 0; - } - - int done (void) - { - return (shutdown_ == 1); - } - - ACE_thread_t thread_id (Worker *worker) - { - return worker->thread_id (); - } - -private: - int shutdown_; - ACE_Thread_Mutex workers_lock_; - ACE_Condition<ACE_Thread_Mutex> workers_cond_; - ACE_Unbounded_Queue<Worker* > workers_; - ACE_Activation_Queue queue_; -}; -// Listing 4 - -int -Manager::shut_down (void) -{ - ACE_TRACE (ACE_TEXT ("Manager::shut_down")); - ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin (); - Worker **worker_ptr = NULL; - do - { - iter.next (worker_ptr); - Worker *worker = (*worker_ptr); - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"), - thread_id (worker))); - - Exit *req; - ACE_NEW_RETURN (req, Exit(), -1); - - // Send the hangup message - worker->perform (req); - - // Wait for the exit. - worker->wait (); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Worker %d shut down.\n"), - thread_id (worker))); - - delete req; - delete worker; - - } - while (iter.advance ()); - - shutdown_ = 1; - - return 0; -} - -// Listing 5 code/ch16 -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - Manager tp; - tp.activate (); - - ACE_Time_Value tv; - tv.msec (100); - - // Wait for a few seconds every time you send a message. - CompletionCallBack cb; - LongWork workArray[OUTSTANDING_REQUESTS]; - for (int i = 0; i < OUTSTANDING_REQUESTS; i++) - { - workArray[i].attach (&cb); - ACE_OS::sleep (tv); - tp.perform (&workArray[i]); - } - - ACE_Thread_Manager::instance ()->wait (); - return 0; -} -// Listing 5 - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Condition<ACE_Thread_Mutex>; -template class ACE_Future<ACE_String_Base<char>*>; -template class ACE_Future_Observer<ACE_String_Base<char>*>; -template class ACE_Future_Rep<ACE_String_Base<char>*>; -template class ACE_Node<Worker*>; -template class ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>; -template class ACE_Unbounded_Queue<Worker*>; -template class ACE_Unbounded_Queue_Iterator<Worker*>; -template class ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>; -template class ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Condition<ACE_Thread_Mutex> -#pragma instantiate ACE_Future<ACE_String_Base<char>*> -#pragma instantiate ACE_Future_Observer<ACE_String_Base<char>*> -#pragma instantiate ACE_Future_Rep<ACE_String_Base<char>*> -#pragma instantiate ACE_Node<Worker*> -#pragma instantiate ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*> -#pragma instantiate ACE_Unbounded_Queue<Worker*> -#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*> -#pragma instantiate ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*> -#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - -#else -#include "ace/OS_main.h" -#include "ace/OS_NS_stdio.h" - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_OS::puts (ACE_TEXT ("This example requires threads.")); - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/APG/ThreadPools/LF_ThreadPool.cpp b/examples/APG/ThreadPools/LF_ThreadPool.cpp deleted file mode 100644 index 79540435f96..00000000000 --- a/examples/APG/ThreadPools/LF_ThreadPool.cpp +++ /dev/null @@ -1,264 +0,0 @@ -// $Id$ - -#include "ace/config-lite.h" -#if defined (ACE_HAS_THREADS) - -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_sys_time.h" -#include "ace/Task.h" -#include "ace/Containers.h" -#include "ace/Synch.h" - -// Listing 4 code/ch16 -class Follower -{ -public: - Follower (ACE_Thread_Mutex &leader_lock) - : cond_(leader_lock) - { - owner_ = ACE_Thread::self (); - } - - int wait (void) - { - return this->cond_.wait (); - } - - int signal (void) - { - return this->cond_.signal (); - } - - ACE_thread_t owner (void) - { - return this->owner_; - } - -private: - ACE_Condition<ACE_Thread_Mutex> cond_; - ACE_thread_t owner_; -}; -// Listing 4 -// Listing 1 code/ch16 -class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH> -{ -public: - LF_ThreadPool () : shutdown_(0), current_leader_(0) - { - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP")); - } - - virtual int svc (void); - - void shut_down (void) - { - shutdown_ = 1; - } - -private: - int become_leader (void); - - Follower *make_follower (void); - - int elect_new_leader (void); - - int leader_active (void) - { - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active")); - return this->current_leader_ != 0; - } - - void leader_active (ACE_thread_t leader) - { - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active")); - this->current_leader_ = leader; - } - - void process_message (ACE_Message_Block *mb); - - int done (void) - { - return (shutdown_ == 1); - } - -private: - int shutdown_; - ACE_thread_t current_leader_; - ACE_Thread_Mutex leader_lock_; - ACE_Unbounded_Queue<Follower*> followers_; - ACE_Thread_Mutex followers_lock_; - static long LONG_TIME; -}; -// Listing 1 -// Listing 2 code/ch16 -int -LF_ThreadPool::svc (void) -{ - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc")); - while (!done ()) - { - become_leader (); // Block until this thread is the leader. - - ACE_Message_Block *mb = NULL; - ACE_Time_Value tv (LONG_TIME); - tv += ACE_OS::gettimeofday (); - - // Get a message, elect new leader, then process message. - if (this->getq (mb, &tv) < 0) - { - if (elect_new_leader () == 0) - break; - continue; - } - - elect_new_leader (); - process_message (mb); - } - - return 0; -} -// Listing 2 -// Listing 3 code/ch16 -int -LF_ThreadPool::become_leader (void) -{ - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::become_leader")); - - ACE_GUARD_RETURN - (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1); - if (leader_active ()) - { - Follower *fw = make_follower (); - { - // Wait until told to do so. - while (leader_active ()) - fw->wait (); - } - - delete fw; - } - - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader\n"))); - - // Mark yourself as the active leader. - leader_active (ACE_Thread::self ()); - return 0; -} - -Follower* -LF_ThreadPool::make_follower (void) -{ - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower")); - - ACE_GUARD_RETURN - (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0); - Follower *fw; - ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0); - this->followers_.enqueue_tail (fw); - return fw; -} -// Listing 3 -// Listing 5 code/ch16 -int -LF_ThreadPool::elect_new_leader (void) -{ - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader")); - - ACE_GUARD_RETURN - (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1); - leader_active (0); - - // Wake up a follower - if (!followers_.is_empty ()) - { - ACE_GUARD_RETURN (ACE_Thread_Mutex, - follower_mon, - this->followers_lock_, - -1); - // Get the old follower. - Follower *fw; - if (this->followers_.dequeue_head (fw) != 0) - return -1; - ACE_DEBUG ((LM_ERROR, - ACE_TEXT ("(%t) Resigning and Electing %d\n"), - fw->owner ())); - return (fw->signal () == 0) ? 0 : -1; - } - else - { - ACE_DEBUG - ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left\n"))); - return -1; - } -} -// Listing 5 - -void -LF_ThreadPool::process_message (ACE_Message_Block *mb) -{ - ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message")); - int msgId; - ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); - mb->release (); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Started processing message:%d\n"), - msgId)); - ACE_OS::sleep (1); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Finished processing message:%d\n"), - msgId)); -} - -long LF_ThreadPool::LONG_TIME = 5L; - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - LF_ThreadPool tp; - tp.activate (THR_NEW_LWP| THR_JOINABLE, 5); - - // Wait for a few seconds... - ACE_OS::sleep (2); - ACE_Time_Value tv (1L); - - ACE_Message_Block *mb; - for (int i = 0; i < 30; i++) - { - ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1); - ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); - ACE_OS::sleep (tv); - - // Add a new work item. - tp.putq (mb); - } - - ACE_Thread_Manager::instance ()->wait (); - - ACE_OS::sleep (10); - - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Condition<ACE_Thread_Mutex>; -template class ACE_Node<Follower*>; -template class ACE_Unbounded_Queue<Follower*>; -template class ACE_Unbounded_Queue_Iterator<Follower*>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Condition<ACE_Thread_Mutex> -#pragma instantiate ACE_Node<Follower*> -#pragma instantiate ACE_Unbounded_Queue<Follower*> -#pragma instantiate ACE_Unbounded_Queue_Iterator<Follower*> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - -#else -#include "ace/OS_main.h" -#include "ace/OS_NS_stdio.h" - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_OS::puts (ACE_TEXT ("This example requires threads.")); - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/APG/ThreadPools/Request_Handler.h b/examples/APG/ThreadPools/Request_Handler.h deleted file mode 100644 index c534fd57670..00000000000 --- a/examples/APG/ThreadPools/Request_Handler.h +++ /dev/null @@ -1,31 +0,0 @@ -/** - * $Id$ - * - * Sample code from The ACE Programmer's Guide, - * copyright 2003 Addison-Wesley. All Rights Reserved. - */ - -#ifndef __REQUEST_HANDLER_H_ -#define __REQUEST_HANDLER_H_ - -#include "ace/Svc_Handler.h" -#include "ace/SOCK_Stream.h" -class ACE_Thread_Manager; - -class Request_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> - { - // = TITLE - // This class is the Svc_Handler used by <Acceptor>. - public: - Request_Handler (ACE_Thread_Manager *tm = 0); - // The default constructor makes sure the right reactor is used. - - protected: - virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); - virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask = 0); - - private: - size_t nr_msgs_rcvd_; - }; - -#endif /* __REQUEST_HANDLER_H_ */ diff --git a/examples/APG/ThreadPools/TP_Reactor.cpp b/examples/APG/ThreadPools/TP_Reactor.cpp deleted file mode 100644 index 56998bb2a0d..00000000000 --- a/examples/APG/ThreadPools/TP_Reactor.cpp +++ /dev/null @@ -1,287 +0,0 @@ -// == == == == == == == == == == == == == == == == == == == == == == == -// $Id$ -// Stolen from $ACE_ROOT/tests/Thread_Pool_Reactor_Test.cpp -// Thread_Pool_Reactor_Test.cpp, v 1.29 2001/03/20 01:07:21 irfan Exp -// = AUTHOR -// Irfan Pyarali <irfan@cs.wustl.edu> and -// Nanbor Wang <nanbor@cs.wustl.edu> -// == == == == == == == == == == == == == == == == == == == == == == == - -#include "ace/config-lite.h" -#if defined (ACE_HAS_THREADS) - -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_unistd.h" -#include "ace/SOCK_Connector.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/Acceptor.h" -#include "ace/Thread_Manager.h" -#include "ace/TP_Reactor.h" - -#include "Request_Handler.h" - -// Accepting end point. This is actually "localhost:10010", but some -// platform couldn't resolve the name so we use the IP address -// directly here. -static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010"); - -// Total number of server threads. -static size_t svr_thrno = 5; - -// Total number of client threads. -static size_t cli_runs = 2; - -// Total connection attemps of a client thread. -static size_t cli_conn_no = 2; - -// Total requests a client thread sends. -static size_t cli_req_no = 5; - -// Delay before a thread sending the next request (in msec.) -static int req_delay = 50; - - -typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR; - - -Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr) - : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr), - nr_msgs_rcvd_(0) -{ - this->reactor (ACE_Reactor::instance ()); -} - -int -Request_Handler::handle_input (ACE_HANDLE fd) -{ - ACE_TCHAR buffer[BUFSIZ]; - ACE_TCHAR len = 0; - ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR)); - - if (result > 0 - && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR)) - == ACE_static_cast (ssize_t, len * sizeof (ACE_TCHAR))) - { - ++this->nr_msgs_rcvd_; - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) svr input; fd: 0x%x; input: %s\n"), - fd, - buffer)); - if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0) - ACE_Reactor::instance()->end_reactor_event_loop (); - return 0; - } - else - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Request_Handler: 0x%x peer closed (0x%x)\n"), - this, fd)); - return -1; -} - -int -Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask) -{ - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) svr close; fd: 0x%x, rcvd %d msgs\n"), - fd, - this->nr_msgs_rcvd_)); - - if (this->nr_msgs_rcvd_ != cli_req_no) - ACE_ERROR((LM_ERROR, - ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"), - this, - cli_req_no, - this->nr_msgs_rcvd_)); - - this->destroy (); - return 0; -} - -// Listing 2 code/ch16 -static int -reactor_event_hook (ACE_Reactor *) -{ - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) handling events ....\n"))); - - return 0; -} - -class ServerTP : public ACE_Task_Base -{ -public: - virtual int svc (void) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Running the event loop\n"))); - - int result = - ACE_Reactor::instance ()->run_reactor_event_loop - (&reactor_event_hook); - - if (result == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), - ACE_TEXT ("Error handling events")), - 0); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Done handling events.\n"))); - - return 0; - } -}; -// Listing 2 - -class Client: public ACE_Task_Base - { - public: - Client() - :addr_(rendezvous) - {} - - virtual int svc() - { - ACE_OS::sleep (3); - const ACE_TCHAR *msg = - ACE_TEXT ("Message from Connection worker"); - - ACE_TCHAR buf [BUFSIZ]; - buf[0] = ACE_OS::strlen (msg) + 1; - ACE_OS::strcpy (&buf[1], msg); - - for (size_t i = 0; i < cli_runs; i++) - send_work_to_server(buf); - - shut_down(); - - return 0; - } - - private: - void send_work_to_server(ACE_TCHAR* arg) - { - ACE_SOCK_Stream stream; - ACE_SOCK_Connector connect; - ACE_Time_Value delay (0, req_delay); - size_t len = * ACE_reinterpret_cast (ACE_TCHAR *, arg); - - for (size_t i = 0 ; i < cli_conn_no; i++) - { - if (connect.connect (stream, addr_) < 0) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), - ACE_TEXT ("connect"))); - continue; - } - - for (size_t j = 0; j < cli_req_no; j++) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Sending work to server on handle 0x%x, req %d\n"), - stream.get_handle (), - j+1)); - if (stream.send_n (arg, - (len + 1) * sizeof (ACE_TCHAR)) == -1) - { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), - ACE_TEXT ("send_n"))); - continue; - } - ACE_OS::sleep (delay); - } - - stream.close (); - } - - } - - void shut_down() - { - ACE_SOCK_Stream stream; - ACE_SOCK_Connector connect; - - if (connect.connect (stream, addr_) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p Error while connecting\n"), - ACE_TEXT ("connect"))); - - const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown"); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("shutdown stream handle = %x\n"), - stream.get_handle ())); - - if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), - ACE_TEXT ("send_n"))); - - stream.close (); - } - private: - ACE_INET_Addr addr_; - }; -// Listing 1 code/ch16 -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_TP_Reactor sr; - ACE_Reactor new_reactor (&sr); - ACE_Reactor::instance (&new_reactor); - - ACCEPTOR acceptor; - ACE_INET_Addr accept_addr (rendezvous); - - if (acceptor.open (accept_addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("open")), - 1); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Spawning %d server threads...\n"), - svr_thrno)); - - ServerTP serverTP; - serverTP.activate (THR_NEW_LWP | THR_JOINABLE, svr_thrno); - - Client client; - client.activate (); - - ACE_Thread_Manager::instance ()->wait (); - - return 0; -} -// Listing 1 -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Accept_Strategy<Request_Handler, ACE_SOCK_ACCEPTOR>; -template class ACE_Concurrency_Strategy<Request_Handler>; -template class ACE_Creation_Strategy<Request_Handler>; -template class ACE_Scheduling_Strategy<Request_Handler>; -template class ACE_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>; -template class ACE_Strategy_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>; -template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Accept_Strategy<Request_Handler, ACE_SOCK_ACCEPTOR> -#pragma instantiate ACE_Concurrency_Strategy<Request_Handler> -#pragma instantiate ACE_Creation_Strategy<Request_Handler> -#pragma instantiate ACE_Scheduling_Strategy<Request_Handler> -#pragma instantiate ACE_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR> -#pragma instantiate ACE_Strategy_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR> -#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - -#else -#include "ace/OS_main.h" -#include "ace/OS_NS_stdio.h" - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_OS::puts (ACE_TEXT ("This example requires threads.")); - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/APG/ThreadPools/Task_ThreadPool.cpp b/examples/APG/ThreadPools/Task_ThreadPool.cpp deleted file mode 100644 index 53ebe76b0bc..00000000000 --- a/examples/APG/ThreadPools/Task_ThreadPool.cpp +++ /dev/null @@ -1,149 +0,0 @@ -// $Id$ - -#include "ace/config-lite.h" -#if defined (ACE_HAS_THREADS) - -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_time.h" -#include "ace/Task.h" -#include "ace/Synch.h" -#include "ace/SString.h" - -// Listing 2 code/ch16 -class Workers : public ACE_Task<ACE_MT_SYNCH> -{ -public: - Workers () - { } - - virtual int svc (void) - { - while (1) - { - ACE_Message_Block *mb = 0; - if (this->getq (mb) == -1) - { - ACE_DEBUG ((LM_INFO, - ACE_TEXT ("(%t) Shutting down\n"))); - break; - } - - // Process the message. - process_message (mb); - } - - return 0; - } - // Listing 2 - -private: - void process_message (ACE_Message_Block *mb) - { - ACE_TRACE (ACE_TEXT ("Workers::process_message")); - int msgId; - ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); - mb->release (); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Started processing message %d\n"), - msgId)); - ACE_OS::sleep (3); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Finished processing message %d\n"), - msgId)); - } -}; - -// Listing 1 code/ch16 -class Manager : public ACE_Task<ACE_MT_SYNCH> -{ -public: - enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; - - Manager () : shutdown_(0) - { - ACE_TRACE (ACE_TEXT ("Manager::Manager")); - } - - int svc (void) - { - ACE_TRACE (ACE_TEXT ("Manager::svc")); - - ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); - - // Create pool. - Workers pool; - pool.activate (THR_NEW_LWP | THR_JOINABLE, POOL_SIZE); - - while (!done ()) - { - ACE_Message_Block *mb = 0; - ACE_Time_Value tv ((long)MAX_TIMEOUT); - tv += ACE_OS::time (0); - - // Get a message request. - if (this->getq (mb, &tv) < 0) - { - pool.msg_queue ()->deactivate (); - pool.wait (); - break; - } - - // Ask the worker pool to do the job. - pool.putq (mb); - } - - return 0; - } - -private: - int done (void); - - int shutdown_; -}; -// Listing 1 - -int Manager::done (void) -{ - return (shutdown_ == 1); -} - - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - Manager tp; - tp.activate (); - - // Wait for a moment every time you send a message. - ACE_Time_Value tv; - tv.msec (100); - - ACE_Message_Block *mb; - for (int i = 0; i < 30; i++) - { - ACE_NEW_RETURN - (mb, ACE_Message_Block(sizeof(int)), -1); - - ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); - - ACE_OS::sleep (tv); - - // Add a new work item. - tp.putq (mb); - } - - ACE_Thread_Manager::instance ()->wait (); - return 0; -} - -#else -#include "ace/OS_main.h" -#include "ace/OS_NS_stdio.h" - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_OS::puts (ACE_TEXT ("This example requires threads.")); - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/APG/ThreadPools/ThreadPool.cpp b/examples/APG/ThreadPools/ThreadPool.cpp deleted file mode 100644 index cfcfcee8b58..00000000000 --- a/examples/APG/ThreadPools/ThreadPool.cpp +++ /dev/null @@ -1,281 +0,0 @@ -// $Id$ - -#include "ace/config-lite.h" -#if defined (ACE_HAS_THREADS) - -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_time.h" -#include "ace/Task.h" -#include "ace/Containers.h" -#include "ace/Synch.h" -#include "ace/SString.h" -#include "ace/Method_Request.h" -#include "ace/Future.h" -#include "ace/Activation_Queue.h" - -class Worker; - -class IManager -{ -public: - virtual int return_to_work (Worker *worker) = 0; -}; - -// Listing 2 code/ch16 -class Worker : public ACE_Task<ACE_MT_SYNCH> -{ -public: - Worker (IManager *manager) : manager_(manager) { } - - virtual int svc (void) - { - thread_id_ = ACE_Thread::self (); - while (1) - { - ACE_Message_Block *mb = 0; - if (this->getq (mb) == -1) - ACE_ERROR_BREAK - ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq"))); - if (mb->msg_type () == ACE_Message_Block::MB_HANGUP) - { - ACE_DEBUG ((LM_INFO, - ACE_TEXT ("(%t) Shutting down\n"))); - mb->release (); - break; - } - // Process the message. - process_message (mb); - // Return to work. - this->manager_->return_to_work (this); - } - - return 0; - } - // Listing 2 - - ACE_thread_t thread_id (void) - { - return thread_id_; - } - -private: - void process_message (ACE_Message_Block *mb) - { - ACE_TRACE (ACE_TEXT ("Worker::process_message")); - int msgId; - ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); - mb->release (); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Started processing message %d\n"), - msgId)); - ACE_OS::sleep (3); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Finished processing message %d\n"), - msgId)); - } - - IManager *manager_; - ACE_thread_t thread_id_; -}; - -// Listing 1 code/ch16 -class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager -{ -public: - enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; - - Manager () - : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_) - { - ACE_TRACE (ACE_TEXT ("Manager::Manager")); - } - - int svc (void) - { - ACE_TRACE (ACE_TEXT ("Manager::svc")); - - ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); - - // Create pool. - create_worker_pool (); - - while (!done ()) - { - ACE_Message_Block *mb = NULL; - ACE_Time_Value tv ((long)MAX_TIMEOUT); - tv += ACE_OS::time (0); - - // Get a message request. - if (this->getq (mb, &tv) < 0) - { - shut_down (); - break; - } - - // Choose a worker. - Worker *worker; - { - ACE_GUARD_RETURN (ACE_Thread_Mutex, - worker_mon, this->workers_lock_, -1); - - while (this->workers_.is_empty ()) - workers_cond_.wait (); - - this->workers_.dequeue_head (worker); - } - - // Ask the worker to do the job. - worker->putq (mb); - } - - return 0; - } - - int shut_down (void); - - ACE_thread_t thread_id (Worker *worker); - - virtual int return_to_work (Worker *worker) - { - ACE_GUARD_RETURN (ACE_Thread_Mutex, - worker_mon, this->workers_lock_, -1); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Worker %d returning to work.\n"), - worker->thr_mgr ()->thr_self ())); - this->workers_.enqueue_tail (worker); - this->workers_cond_.signal (); - - return 0; - } - -private: - int create_worker_pool (void) - { - ACE_GUARD_RETURN (ACE_Thread_Mutex, - worker_mon, - this->workers_lock_, - -1); - for (int i = 0; i < POOL_SIZE; i++) - { - Worker *worker; - ACE_NEW_RETURN (worker, Worker (this), -1); - this->workers_.enqueue_tail (worker); - worker->activate (); - } - - return 0; - } - - int done (void); - -private: - int shutdown_; - ACE_Thread_Mutex workers_lock_; - ACE_Condition<ACE_Thread_Mutex> workers_cond_; - ACE_Unbounded_Queue<Worker* > workers_; -}; -// Listing 1 - -int Manager::done (void) -{ - return (shutdown_ == 1); -} - -int -Manager::shut_down (void) -{ - ACE_TRACE (ACE_TEXT ("Manager::shut_down")); - ACE_Unbounded_Queue<Worker* >::ITERATOR iter = - this->workers_.begin (); - Worker **worker_ptr = NULL; - do - { - iter.next (worker_ptr); - Worker *worker = (*worker_ptr); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Attempting shutdown of %d\n"), - thread_id (worker))); - - // Send the hangup message. - ACE_Message_Block *mb; - ACE_NEW_RETURN - (mb, - ACE_Message_Block(0, - ACE_Message_Block::MB_HANGUP), - -1); - worker->putq (mb); - - // Wait for the exit. - worker->wait (); - - ACE_ASSERT (worker->msg_queue ()->is_empty ()); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Worker %d shut down.\n)"), - thread_id (worker))); - delete worker; - } - while (iter.advance ()); - - shutdown_ = 1; - - return 0; -} - -ACE_thread_t -Manager::thread_id (Worker *worker) -{ - return worker->thread_id (); -} - - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - Manager tp; - tp.activate (); - - // Wait for a moment every time you send a message. - ACE_Time_Value tv; - tv.msec (100); - - ACE_Message_Block *mb; - for (int i = 0; i < 30; i++) - { - ACE_NEW_RETURN - (mb, ACE_Message_Block(sizeof(int)), -1); - - ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); - - ACE_OS::sleep (tv); - - // Add a new work item. - tp.putq (mb); - } - - ACE_Thread_Manager::instance ()->wait (); - return 0; -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Condition<ACE_Thread_Mutex>; -template class ACE_Node<Worker*>; -template class ACE_Unbounded_Queue<Worker*>; -template class ACE_Unbounded_Queue_Iterator<Worker*>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Condition<ACE_Thread_Mutex> -#pragma instantiate ACE_Node<Worker*> -#pragma instantiate ACE_Unbounded_Queue<Worker*> -#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - -#else -#include "ace/OS_main.h" -#include "ace/OS_NS_stdio.h" - -int ACE_TMAIN (int, ACE_TCHAR *[]) -{ - ACE_OS::puts (ACE_TEXT ("This example requires threads.")); - return 0; -} - -#endif /* ACE_HAS_THREADS */ diff --git a/examples/APG/ThreadPools/threadpools.mpc b/examples/APG/ThreadPools/threadpools.mpc deleted file mode 100644 index 1855f7b40a2..00000000000 --- a/examples/APG/ThreadPools/threadpools.mpc +++ /dev/null @@ -1,37 +0,0 @@ -// -*- MPC -*- -// $Id$ - -project(Futures) : aceexe { - exename = Futures - Source_Files { - Futures.cpp - } -} - -project(LF ThreadPool) : aceexe { - exename = LF_ThreadPool - Source_Files { - LF_ThreadPool.cpp - } -} - -project(Task ThreadPool) : aceexe { - exename = Task_ThreadPool - Source_Files { - Task_ThreadPool.cpp - } -} - -project(ThreadPool) : aceexe { - exename = ThreadPool - Source_Files { - ThreadPool.cpp - } -} - -project(TP Reactor) : aceexe { - exename = TP_Reactor - Source_Files { - TP_Reactor.cpp - } -} diff --git a/examples/APG/ThreadPools/threadpools.mwc b/examples/APG/ThreadPools/threadpools.mwc deleted file mode 100644 index a4c2b0f8085..00000000000 --- a/examples/APG/ThreadPools/threadpools.mwc +++ /dev/null @@ -1,6 +0,0 @@ -// -*- MPC -*- -// $Id$ - -workspace { - threadpools.mpc -} |