summaryrefslogtreecommitdiff
path: root/examples/APG/ThreadPools
diff options
context:
space:
mode:
authornobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-07-31 16:14:11 +0000
committernobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-07-31 16:14:11 +0000
commit9c2ec3049a5634f1c916d792e81fc80edb669ea8 (patch)
tree0d634e179761581cadbffe852e7f9a9f049b5c6e /examples/APG/ThreadPools
parent6540653cf736840d5aad719c73a8e43a549080be (diff)
downloadATCD-9c2ec3049a5634f1c916d792e81fc80edb669ea8.tar.gz
This commit was manufactured by cvs2svn to create tag 'TAO-1_4_2'.TAO-1_4_2
Diffstat (limited to 'examples/APG/ThreadPools')
-rw-r--r--examples/APG/ThreadPools/Futures.cpp343
-rw-r--r--examples/APG/ThreadPools/LF_ThreadPool.cpp264
-rw-r--r--examples/APG/ThreadPools/Request_Handler.h31
-rw-r--r--examples/APG/ThreadPools/TP_Reactor.cpp287
-rw-r--r--examples/APG/ThreadPools/Task_ThreadPool.cpp149
-rw-r--r--examples/APG/ThreadPools/ThreadPool.cpp281
-rw-r--r--examples/APG/ThreadPools/threadpools.mpc37
-rw-r--r--examples/APG/ThreadPools/threadpools.mwc6
8 files changed, 0 insertions, 1398 deletions
diff --git a/examples/APG/ThreadPools/Futures.cpp b/examples/APG/ThreadPools/Futures.cpp
deleted file mode 100644
index 95f76a524cf..00000000000
--- a/examples/APG/ThreadPools/Futures.cpp
+++ /dev/null
@@ -1,343 +0,0 @@
-// $Id$
-
-#include "ace/config-lite.h"
-#if defined (ACE_HAS_THREADS)
-
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_time.h"
-#include "ace/Task.h"
-#include "ace/Unbounded_Queue.h"
-#include "ace/Synch.h"
-#include "ace/SString.h"
-#include "ace/Method_Request.h"
-#include "ace/Future.h"
-#include "ace/Activation_Queue.h"
-
-#define OUTSTANDING_REQUESTS 20
-
-// Listing 2 code/ch16
-class CompletionCallBack: public ACE_Future_Observer<ACE_CString*>
-{
-public:
- virtual void update (const ACE_Future<ACE_CString*> & future)
- {
- ACE_CString *result;
-
- // Block for the result.
- ((ACE_Future<ACE_CString*>)future).get (result);
- ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ()));
- delete result;
- }
-};
-// Listing 2
-// Listing 1 code/ch16
-class LongWork : public ACE_Method_Request
-{
-public:
- virtual int call (void)
- {
- ACE_TRACE (ACE_TEXT ("LongWork::call"));
- ACE_DEBUG
- ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n")));
- ACE_OS::sleep (1);
-
- char buf[1024];
- ACE_OS::strcpy (buf, ACE_TEXT ("Completed assigned task\n"));
- ACE_CString *msg;
- ACE_NEW_RETURN
- (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1);
- result_.set (msg);
- return 0;
- }
-
- ACE_Future<ACE_CString*> &future (void)
- {
- ACE_TRACE (ACE_TEXT ("LongWork::future"));
- return result_;
- }
-
- void attach (CompletionCallBack *cb)
- {
- result_.attach (cb);
- }
-
-private:
- ACE_Future<ACE_CString*> result_;
-};
-// Listing 1
-
-class Exit : public ACE_Method_Request
-{
-public:
- virtual int call (void)
- {
- ACE_TRACE (ACE_TEXT ("Exit::call"));
- return -1;
- }
-};
-
-class Worker;
-
-class IManager
-{
-public:
- virtual int return_to_work (Worker *worker) = 0;
-};
-
-// Listing 3 code/ch16
-class Worker: public ACE_Task<ACE_MT_SYNCH>
-{
-public:
- Worker (IManager *manager)
- : manager_(manager), queue_ (msg_queue ())
- { }
-
- int perform (ACE_Method_Request *req)
- {
- ACE_TRACE (ACE_TEXT ("Worker::perform"));
- return this->queue_.enqueue (req);
- }
-
- virtual int svc (void)
- {
- thread_id_ = ACE_Thread::self ();
- while (1)
- {
- ACE_Method_Request *request = this->queue_.dequeue();
- if (request == 0)
- return -1;
-
- // Invoke the request
- int result = request->call ();
- if (result == -1)
- break;
-
- // Return to work.
- this->manager_->return_to_work (this);
- }
-
- return 0;
- }
-
- ACE_thread_t thread_id (void);
-
-private:
- IManager *manager_;
- ACE_thread_t thread_id_;
- ACE_Activation_Queue queue_;
-};
-// Listing 3
-
-ACE_thread_t Worker::thread_id (void)
-{
- return thread_id_;
-}
-
-// Listing 4 code/ch16
-class Manager : public ACE_Task_Base, private IManager
-{
-public:
- enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
-
- Manager ()
- : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
- {
- ACE_TRACE (ACE_TEXT ("Manager::TP"));
- }
-
- int perform (ACE_Method_Request *req)
- {
- ACE_TRACE (ACE_TEXT ("Manager::perform"));
- return this->queue_.enqueue (req);
- }
-
- int svc (void)
- {
- ACE_TRACE (ACE_TEXT ("Manager::svc"));
-
- ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
-
- // Create pool when you get in the first time.
- create_worker_pool ();
-
- while (!done ())
- {
- ACE_Time_Value tv ((long)MAX_TIMEOUT);
- tv += ACE_OS::time (0);
-
- // Get the next message
- ACE_Method_Request *request = this->queue_.dequeue (&tv);
- if (request == 0)
- {
- shut_down ();
- break;
- }
-
- // Choose a worker.
- Worker *worker = choose_worker ();
-
- // Ask the worker to do the job.
- worker->perform (request);
- }
-
- return 0;
- }
-
- int shut_down (void);
-
- virtual int return_to_work (Worker *worker)
- {
- ACE_GUARD_RETURN
- (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
- ACE_DEBUG
- ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n")));
- this->workers_.enqueue_tail (worker);
- this->workers_cond_.signal ();
-
- return 0;
- }
-
-private:
- Worker *choose_worker (void)
- {
- ACE_GUARD_RETURN
- (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0)
-
- while (this->workers_.is_empty ())
- workers_cond_.wait ();
-
- Worker *worker;
- this->workers_.dequeue_head (worker);
- return worker;
- }
-
- int create_worker_pool (void)
- {
- ACE_GUARD_RETURN
- (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
- for (int i = 0; i < POOL_SIZE; i++)
- {
- Worker *worker;
- ACE_NEW_RETURN (worker, Worker (this), -1);
- this->workers_.enqueue_tail (worker);
- worker->activate ();
- }
-
- return 0;
- }
-
- int done (void)
- {
- return (shutdown_ == 1);
- }
-
- ACE_thread_t thread_id (Worker *worker)
- {
- return worker->thread_id ();
- }
-
-private:
- int shutdown_;
- ACE_Thread_Mutex workers_lock_;
- ACE_Condition<ACE_Thread_Mutex> workers_cond_;
- ACE_Unbounded_Queue<Worker* > workers_;
- ACE_Activation_Queue queue_;
-};
-// Listing 4
-
-int
-Manager::shut_down (void)
-{
- ACE_TRACE (ACE_TEXT ("Manager::shut_down"));
- ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin ();
- Worker **worker_ptr = NULL;
- do
- {
- iter.next (worker_ptr);
- Worker *worker = (*worker_ptr);
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"),
- thread_id (worker)));
-
- Exit *req;
- ACE_NEW_RETURN (req, Exit(), -1);
-
- // Send the hangup message
- worker->perform (req);
-
- // Wait for the exit.
- worker->wait ();
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Worker %d shut down.\n"),
- thread_id (worker)));
-
- delete req;
- delete worker;
-
- }
- while (iter.advance ());
-
- shutdown_ = 1;
-
- return 0;
-}
-
-// Listing 5 code/ch16
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- Manager tp;
- tp.activate ();
-
- ACE_Time_Value tv;
- tv.msec (100);
-
- // Wait for a few seconds every time you send a message.
- CompletionCallBack cb;
- LongWork workArray[OUTSTANDING_REQUESTS];
- for (int i = 0; i < OUTSTANDING_REQUESTS; i++)
- {
- workArray[i].attach (&cb);
- ACE_OS::sleep (tv);
- tp.perform (&workArray[i]);
- }
-
- ACE_Thread_Manager::instance ()->wait ();
- return 0;
-}
-// Listing 5
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Condition<ACE_Thread_Mutex>;
-template class ACE_Future<ACE_String_Base<char>*>;
-template class ACE_Future_Observer<ACE_String_Base<char>*>;
-template class ACE_Future_Rep<ACE_String_Base<char>*>;
-template class ACE_Node<Worker*>;
-template class ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>;
-template class ACE_Unbounded_Queue<Worker*>;
-template class ACE_Unbounded_Queue_Iterator<Worker*>;
-template class ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>;
-template class ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Condition<ACE_Thread_Mutex>
-#pragma instantiate ACE_Future<ACE_String_Base<char>*>
-#pragma instantiate ACE_Future_Observer<ACE_String_Base<char>*>
-#pragma instantiate ACE_Future_Rep<ACE_String_Base<char>*>
-#pragma instantiate ACE_Node<Worker*>
-#pragma instantiate ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>
-#pragma instantiate ACE_Unbounded_Queue<Worker*>
-#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*>
-#pragma instantiate ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>
-#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
-#else
-#include "ace/OS_main.h"
-#include "ace/OS_NS_stdio.h"
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- ACE_OS::puts (ACE_TEXT ("This example requires threads."));
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/APG/ThreadPools/LF_ThreadPool.cpp b/examples/APG/ThreadPools/LF_ThreadPool.cpp
deleted file mode 100644
index 79540435f96..00000000000
--- a/examples/APG/ThreadPools/LF_ThreadPool.cpp
+++ /dev/null
@@ -1,264 +0,0 @@
-// $Id$
-
-#include "ace/config-lite.h"
-#if defined (ACE_HAS_THREADS)
-
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_sys_time.h"
-#include "ace/Task.h"
-#include "ace/Containers.h"
-#include "ace/Synch.h"
-
-// Listing 4 code/ch16
-class Follower
-{
-public:
- Follower (ACE_Thread_Mutex &leader_lock)
- : cond_(leader_lock)
- {
- owner_ = ACE_Thread::self ();
- }
-
- int wait (void)
- {
- return this->cond_.wait ();
- }
-
- int signal (void)
- {
- return this->cond_.signal ();
- }
-
- ACE_thread_t owner (void)
- {
- return this->owner_;
- }
-
-private:
- ACE_Condition<ACE_Thread_Mutex> cond_;
- ACE_thread_t owner_;
-};
-// Listing 4
-// Listing 1 code/ch16
-class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>
-{
-public:
- LF_ThreadPool () : shutdown_(0), current_leader_(0)
- {
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP"));
- }
-
- virtual int svc (void);
-
- void shut_down (void)
- {
- shutdown_ = 1;
- }
-
-private:
- int become_leader (void);
-
- Follower *make_follower (void);
-
- int elect_new_leader (void);
-
- int leader_active (void)
- {
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
- return this->current_leader_ != 0;
- }
-
- void leader_active (ACE_thread_t leader)
- {
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
- this->current_leader_ = leader;
- }
-
- void process_message (ACE_Message_Block *mb);
-
- int done (void)
- {
- return (shutdown_ == 1);
- }
-
-private:
- int shutdown_;
- ACE_thread_t current_leader_;
- ACE_Thread_Mutex leader_lock_;
- ACE_Unbounded_Queue<Follower*> followers_;
- ACE_Thread_Mutex followers_lock_;
- static long LONG_TIME;
-};
-// Listing 1
-// Listing 2 code/ch16
-int
-LF_ThreadPool::svc (void)
-{
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc"));
- while (!done ())
- {
- become_leader (); // Block until this thread is the leader.
-
- ACE_Message_Block *mb = NULL;
- ACE_Time_Value tv (LONG_TIME);
- tv += ACE_OS::gettimeofday ();
-
- // Get a message, elect new leader, then process message.
- if (this->getq (mb, &tv) < 0)
- {
- if (elect_new_leader () == 0)
- break;
- continue;
- }
-
- elect_new_leader ();
- process_message (mb);
- }
-
- return 0;
-}
-// Listing 2
-// Listing 3 code/ch16
-int
-LF_ThreadPool::become_leader (void)
-{
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::become_leader"));
-
- ACE_GUARD_RETURN
- (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
- if (leader_active ())
- {
- Follower *fw = make_follower ();
- {
- // Wait until told to do so.
- while (leader_active ())
- fw->wait ();
- }
-
- delete fw;
- }
-
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader\n")));
-
- // Mark yourself as the active leader.
- leader_active (ACE_Thread::self ());
- return 0;
-}
-
-Follower*
-LF_ThreadPool::make_follower (void)
-{
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower"));
-
- ACE_GUARD_RETURN
- (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);
- Follower *fw;
- ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);
- this->followers_.enqueue_tail (fw);
- return fw;
-}
-// Listing 3
-// Listing 5 code/ch16
-int
-LF_ThreadPool::elect_new_leader (void)
-{
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader"));
-
- ACE_GUARD_RETURN
- (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
- leader_active (0);
-
- // Wake up a follower
- if (!followers_.is_empty ())
- {
- ACE_GUARD_RETURN (ACE_Thread_Mutex,
- follower_mon,
- this->followers_lock_,
- -1);
- // Get the old follower.
- Follower *fw;
- if (this->followers_.dequeue_head (fw) != 0)
- return -1;
- ACE_DEBUG ((LM_ERROR,
- ACE_TEXT ("(%t) Resigning and Electing %d\n"),
- fw->owner ()));
- return (fw->signal () == 0) ? 0 : -1;
- }
- else
- {
- ACE_DEBUG
- ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left\n")));
- return -1;
- }
-}
-// Listing 5
-
-void
-LF_ThreadPool::process_message (ACE_Message_Block *mb)
-{
- ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message"));
- int msgId;
- ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
- mb->release ();
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Started processing message:%d\n"),
- msgId));
- ACE_OS::sleep (1);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Finished processing message:%d\n"),
- msgId));
-}
-
-long LF_ThreadPool::LONG_TIME = 5L;
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- LF_ThreadPool tp;
- tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);
-
- // Wait for a few seconds...
- ACE_OS::sleep (2);
- ACE_Time_Value tv (1L);
-
- ACE_Message_Block *mb;
- for (int i = 0; i < 30; i++)
- {
- ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);
- ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
- ACE_OS::sleep (tv);
-
- // Add a new work item.
- tp.putq (mb);
- }
-
- ACE_Thread_Manager::instance ()->wait ();
-
- ACE_OS::sleep (10);
-
- return 0;
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Condition<ACE_Thread_Mutex>;
-template class ACE_Node<Follower*>;
-template class ACE_Unbounded_Queue<Follower*>;
-template class ACE_Unbounded_Queue_Iterator<Follower*>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Condition<ACE_Thread_Mutex>
-#pragma instantiate ACE_Node<Follower*>
-#pragma instantiate ACE_Unbounded_Queue<Follower*>
-#pragma instantiate ACE_Unbounded_Queue_Iterator<Follower*>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
-#else
-#include "ace/OS_main.h"
-#include "ace/OS_NS_stdio.h"
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- ACE_OS::puts (ACE_TEXT ("This example requires threads."));
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/APG/ThreadPools/Request_Handler.h b/examples/APG/ThreadPools/Request_Handler.h
deleted file mode 100644
index c534fd57670..00000000000
--- a/examples/APG/ThreadPools/Request_Handler.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * $Id$
- *
- * Sample code from The ACE Programmer's Guide,
- * copyright 2003 Addison-Wesley. All Rights Reserved.
- */
-
-#ifndef __REQUEST_HANDLER_H_
-#define __REQUEST_HANDLER_H_
-
-#include "ace/Svc_Handler.h"
-#include "ace/SOCK_Stream.h"
-class ACE_Thread_Manager;
-
-class Request_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
- {
- // = TITLE
- // This class is the Svc_Handler used by <Acceptor>.
- public:
- Request_Handler (ACE_Thread_Manager *tm = 0);
- // The default constructor makes sure the right reactor is used.
-
- protected:
- virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask = 0);
-
- private:
- size_t nr_msgs_rcvd_;
- };
-
-#endif /* __REQUEST_HANDLER_H_ */
diff --git a/examples/APG/ThreadPools/TP_Reactor.cpp b/examples/APG/ThreadPools/TP_Reactor.cpp
deleted file mode 100644
index 56998bb2a0d..00000000000
--- a/examples/APG/ThreadPools/TP_Reactor.cpp
+++ /dev/null
@@ -1,287 +0,0 @@
-// == == == == == == == == == == == == == == == == == == == == == == ==
-// $Id$
-// Stolen from $ACE_ROOT/tests/Thread_Pool_Reactor_Test.cpp
-// Thread_Pool_Reactor_Test.cpp, v 1.29 2001/03/20 01:07:21 irfan Exp
-// = AUTHOR
-// Irfan Pyarali <irfan@cs.wustl.edu> and
-// Nanbor Wang <nanbor@cs.wustl.edu>
-// == == == == == == == == == == == == == == == == == == == == == == ==
-
-#include "ace/config-lite.h"
-#if defined (ACE_HAS_THREADS)
-
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_unistd.h"
-#include "ace/SOCK_Connector.h"
-#include "ace/SOCK_Acceptor.h"
-#include "ace/Acceptor.h"
-#include "ace/Thread_Manager.h"
-#include "ace/TP_Reactor.h"
-
-#include "Request_Handler.h"
-
-// Accepting end point. This is actually "localhost:10010", but some
-// platform couldn't resolve the name so we use the IP address
-// directly here.
-static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010");
-
-// Total number of server threads.
-static size_t svr_thrno = 5;
-
-// Total number of client threads.
-static size_t cli_runs = 2;
-
-// Total connection attemps of a client thread.
-static size_t cli_conn_no = 2;
-
-// Total requests a client thread sends.
-static size_t cli_req_no = 5;
-
-// Delay before a thread sending the next request (in msec.)
-static int req_delay = 50;
-
-
-typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR;
-
-
-Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr)
- : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr),
- nr_msgs_rcvd_(0)
-{
- this->reactor (ACE_Reactor::instance ());
-}
-
-int
-Request_Handler::handle_input (ACE_HANDLE fd)
-{
- ACE_TCHAR buffer[BUFSIZ];
- ACE_TCHAR len = 0;
- ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR));
-
- if (result > 0
- && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR))
- == ACE_static_cast (ssize_t, len * sizeof (ACE_TCHAR)))
- {
- ++this->nr_msgs_rcvd_;
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) svr input; fd: 0x%x; input: %s\n"),
- fd,
- buffer));
- if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0)
- ACE_Reactor::instance()->end_reactor_event_loop ();
- return 0;
- }
- else
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Request_Handler: 0x%x peer closed (0x%x)\n"),
- this, fd));
- return -1;
-}
-
-int
-Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
-{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) svr close; fd: 0x%x, rcvd %d msgs\n"),
- fd,
- this->nr_msgs_rcvd_));
-
- if (this->nr_msgs_rcvd_ != cli_req_no)
- ACE_ERROR((LM_ERROR,
- ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"),
- this,
- cli_req_no,
- this->nr_msgs_rcvd_));
-
- this->destroy ();
- return 0;
-}
-
-// Listing 2 code/ch16
-static int
-reactor_event_hook (ACE_Reactor *)
-{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) handling events ....\n")));
-
- return 0;
-}
-
-class ServerTP : public ACE_Task_Base
-{
-public:
- virtual int svc (void)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Running the event loop\n")));
-
- int result =
- ACE_Reactor::instance ()->run_reactor_event_loop
- (&reactor_event_hook);
-
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Error handling events")),
- 0);
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Done handling events.\n")));
-
- return 0;
- }
-};
-// Listing 2
-
-class Client: public ACE_Task_Base
- {
- public:
- Client()
- :addr_(rendezvous)
- {}
-
- virtual int svc()
- {
- ACE_OS::sleep (3);
- const ACE_TCHAR *msg =
- ACE_TEXT ("Message from Connection worker");
-
- ACE_TCHAR buf [BUFSIZ];
- buf[0] = ACE_OS::strlen (msg) + 1;
- ACE_OS::strcpy (&buf[1], msg);
-
- for (size_t i = 0; i < cli_runs; i++)
- send_work_to_server(buf);
-
- shut_down();
-
- return 0;
- }
-
- private:
- void send_work_to_server(ACE_TCHAR* arg)
- {
- ACE_SOCK_Stream stream;
- ACE_SOCK_Connector connect;
- ACE_Time_Value delay (0, req_delay);
- size_t len = * ACE_reinterpret_cast (ACE_TCHAR *, arg);
-
- for (size_t i = 0 ; i < cli_conn_no; i++)
- {
- if (connect.connect (stream, addr_) < 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("connect")));
- continue;
- }
-
- for (size_t j = 0; j < cli_req_no; j++)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Sending work to server on handle 0x%x, req %d\n"),
- stream.get_handle (),
- j+1));
- if (stream.send_n (arg,
- (len + 1) * sizeof (ACE_TCHAR)) == -1)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("send_n")));
- continue;
- }
- ACE_OS::sleep (delay);
- }
-
- stream.close ();
- }
-
- }
-
- void shut_down()
- {
- ACE_SOCK_Stream stream;
- ACE_SOCK_Connector connect;
-
- if (connect.connect (stream, addr_) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p Error while connecting\n"),
- ACE_TEXT ("connect")));
-
- const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown");
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("shutdown stream handle = %x\n"),
- stream.get_handle ()));
-
- if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("send_n")));
-
- stream.close ();
- }
- private:
- ACE_INET_Addr addr_;
- };
-// Listing 1 code/ch16
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- ACE_TP_Reactor sr;
- ACE_Reactor new_reactor (&sr);
- ACE_Reactor::instance (&new_reactor);
-
- ACCEPTOR acceptor;
- ACE_INET_Addr accept_addr (rendezvous);
-
- if (acceptor.open (accept_addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("open")),
- 1);
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Spawning %d server threads...\n"),
- svr_thrno));
-
- ServerTP serverTP;
- serverTP.activate (THR_NEW_LWP | THR_JOINABLE, svr_thrno);
-
- Client client;
- client.activate ();
-
- ACE_Thread_Manager::instance ()->wait ();
-
- return 0;
-}
-// Listing 1
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Accept_Strategy<Request_Handler, ACE_SOCK_ACCEPTOR>;
-template class ACE_Concurrency_Strategy<Request_Handler>;
-template class ACE_Creation_Strategy<Request_Handler>;
-template class ACE_Scheduling_Strategy<Request_Handler>;
-template class ACE_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>;
-template class ACE_Strategy_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>;
-template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Accept_Strategy<Request_Handler, ACE_SOCK_ACCEPTOR>
-#pragma instantiate ACE_Concurrency_Strategy<Request_Handler>
-#pragma instantiate ACE_Creation_Strategy<Request_Handler>
-#pragma instantiate ACE_Scheduling_Strategy<Request_Handler>
-#pragma instantiate ACE_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>
-#pragma instantiate ACE_Strategy_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>
-#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
-#else
-#include "ace/OS_main.h"
-#include "ace/OS_NS_stdio.h"
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- ACE_OS::puts (ACE_TEXT ("This example requires threads."));
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/APG/ThreadPools/Task_ThreadPool.cpp b/examples/APG/ThreadPools/Task_ThreadPool.cpp
deleted file mode 100644
index 53ebe76b0bc..00000000000
--- a/examples/APG/ThreadPools/Task_ThreadPool.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-// $Id$
-
-#include "ace/config-lite.h"
-#if defined (ACE_HAS_THREADS)
-
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_time.h"
-#include "ace/Task.h"
-#include "ace/Synch.h"
-#include "ace/SString.h"
-
-// Listing 2 code/ch16
-class Workers : public ACE_Task<ACE_MT_SYNCH>
-{
-public:
- Workers ()
- { }
-
- virtual int svc (void)
- {
- while (1)
- {
- ACE_Message_Block *mb = 0;
- if (this->getq (mb) == -1)
- {
- ACE_DEBUG ((LM_INFO,
- ACE_TEXT ("(%t) Shutting down\n")));
- break;
- }
-
- // Process the message.
- process_message (mb);
- }
-
- return 0;
- }
- // Listing 2
-
-private:
- void process_message (ACE_Message_Block *mb)
- {
- ACE_TRACE (ACE_TEXT ("Workers::process_message"));
- int msgId;
- ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
- mb->release ();
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Started processing message %d\n"),
- msgId));
- ACE_OS::sleep (3);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Finished processing message %d\n"),
- msgId));
- }
-};
-
-// Listing 1 code/ch16
-class Manager : public ACE_Task<ACE_MT_SYNCH>
-{
-public:
- enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
-
- Manager () : shutdown_(0)
- {
- ACE_TRACE (ACE_TEXT ("Manager::Manager"));
- }
-
- int svc (void)
- {
- ACE_TRACE (ACE_TEXT ("Manager::svc"));
-
- ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
-
- // Create pool.
- Workers pool;
- pool.activate (THR_NEW_LWP | THR_JOINABLE, POOL_SIZE);
-
- while (!done ())
- {
- ACE_Message_Block *mb = 0;
- ACE_Time_Value tv ((long)MAX_TIMEOUT);
- tv += ACE_OS::time (0);
-
- // Get a message request.
- if (this->getq (mb, &tv) < 0)
- {
- pool.msg_queue ()->deactivate ();
- pool.wait ();
- break;
- }
-
- // Ask the worker pool to do the job.
- pool.putq (mb);
- }
-
- return 0;
- }
-
-private:
- int done (void);
-
- int shutdown_;
-};
-// Listing 1
-
-int Manager::done (void)
-{
- return (shutdown_ == 1);
-}
-
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- Manager tp;
- tp.activate ();
-
- // Wait for a moment every time you send a message.
- ACE_Time_Value tv;
- tv.msec (100);
-
- ACE_Message_Block *mb;
- for (int i = 0; i < 30; i++)
- {
- ACE_NEW_RETURN
- (mb, ACE_Message_Block(sizeof(int)), -1);
-
- ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
-
- ACE_OS::sleep (tv);
-
- // Add a new work item.
- tp.putq (mb);
- }
-
- ACE_Thread_Manager::instance ()->wait ();
- return 0;
-}
-
-#else
-#include "ace/OS_main.h"
-#include "ace/OS_NS_stdio.h"
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- ACE_OS::puts (ACE_TEXT ("This example requires threads."));
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/APG/ThreadPools/ThreadPool.cpp b/examples/APG/ThreadPools/ThreadPool.cpp
deleted file mode 100644
index cfcfcee8b58..00000000000
--- a/examples/APG/ThreadPools/ThreadPool.cpp
+++ /dev/null
@@ -1,281 +0,0 @@
-// $Id$
-
-#include "ace/config-lite.h"
-#if defined (ACE_HAS_THREADS)
-
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_time.h"
-#include "ace/Task.h"
-#include "ace/Containers.h"
-#include "ace/Synch.h"
-#include "ace/SString.h"
-#include "ace/Method_Request.h"
-#include "ace/Future.h"
-#include "ace/Activation_Queue.h"
-
-class Worker;
-
-class IManager
-{
-public:
- virtual int return_to_work (Worker *worker) = 0;
-};
-
-// Listing 2 code/ch16
-class Worker : public ACE_Task<ACE_MT_SYNCH>
-{
-public:
- Worker (IManager *manager) : manager_(manager) { }
-
- virtual int svc (void)
- {
- thread_id_ = ACE_Thread::self ();
- while (1)
- {
- ACE_Message_Block *mb = 0;
- if (this->getq (mb) == -1)
- ACE_ERROR_BREAK
- ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));
- if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
- {
- ACE_DEBUG ((LM_INFO,
- ACE_TEXT ("(%t) Shutting down\n")));
- mb->release ();
- break;
- }
- // Process the message.
- process_message (mb);
- // Return to work.
- this->manager_->return_to_work (this);
- }
-
- return 0;
- }
- // Listing 2
-
- ACE_thread_t thread_id (void)
- {
- return thread_id_;
- }
-
-private:
- void process_message (ACE_Message_Block *mb)
- {
- ACE_TRACE (ACE_TEXT ("Worker::process_message"));
- int msgId;
- ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
- mb->release ();
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Started processing message %d\n"),
- msgId));
- ACE_OS::sleep (3);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Finished processing message %d\n"),
- msgId));
- }
-
- IManager *manager_;
- ACE_thread_t thread_id_;
-};
-
-// Listing 1 code/ch16
-class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
-{
-public:
- enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
-
- Manager ()
- : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
- {
- ACE_TRACE (ACE_TEXT ("Manager::Manager"));
- }
-
- int svc (void)
- {
- ACE_TRACE (ACE_TEXT ("Manager::svc"));
-
- ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
-
- // Create pool.
- create_worker_pool ();
-
- while (!done ())
- {
- ACE_Message_Block *mb = NULL;
- ACE_Time_Value tv ((long)MAX_TIMEOUT);
- tv += ACE_OS::time (0);
-
- // Get a message request.
- if (this->getq (mb, &tv) < 0)
- {
- shut_down ();
- break;
- }
-
- // Choose a worker.
- Worker *worker;
- {
- ACE_GUARD_RETURN (ACE_Thread_Mutex,
- worker_mon, this->workers_lock_, -1);
-
- while (this->workers_.is_empty ())
- workers_cond_.wait ();
-
- this->workers_.dequeue_head (worker);
- }
-
- // Ask the worker to do the job.
- worker->putq (mb);
- }
-
- return 0;
- }
-
- int shut_down (void);
-
- ACE_thread_t thread_id (Worker *worker);
-
- virtual int return_to_work (Worker *worker)
- {
- ACE_GUARD_RETURN (ACE_Thread_Mutex,
- worker_mon, this->workers_lock_, -1);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Worker %d returning to work.\n"),
- worker->thr_mgr ()->thr_self ()));
- this->workers_.enqueue_tail (worker);
- this->workers_cond_.signal ();
-
- return 0;
- }
-
-private:
- int create_worker_pool (void)
- {
- ACE_GUARD_RETURN (ACE_Thread_Mutex,
- worker_mon,
- this->workers_lock_,
- -1);
- for (int i = 0; i < POOL_SIZE; i++)
- {
- Worker *worker;
- ACE_NEW_RETURN (worker, Worker (this), -1);
- this->workers_.enqueue_tail (worker);
- worker->activate ();
- }
-
- return 0;
- }
-
- int done (void);
-
-private:
- int shutdown_;
- ACE_Thread_Mutex workers_lock_;
- ACE_Condition<ACE_Thread_Mutex> workers_cond_;
- ACE_Unbounded_Queue<Worker* > workers_;
-};
-// Listing 1
-
-int Manager::done (void)
-{
- return (shutdown_ == 1);
-}
-
-int
-Manager::shut_down (void)
-{
- ACE_TRACE (ACE_TEXT ("Manager::shut_down"));
- ACE_Unbounded_Queue<Worker* >::ITERATOR iter =
- this->workers_.begin ();
- Worker **worker_ptr = NULL;
- do
- {
- iter.next (worker_ptr);
- Worker *worker = (*worker_ptr);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Attempting shutdown of %d\n"),
- thread_id (worker)));
-
- // Send the hangup message.
- ACE_Message_Block *mb;
- ACE_NEW_RETURN
- (mb,
- ACE_Message_Block(0,
- ACE_Message_Block::MB_HANGUP),
- -1);
- worker->putq (mb);
-
- // Wait for the exit.
- worker->wait ();
-
- ACE_ASSERT (worker->msg_queue ()->is_empty ());
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Worker %d shut down.\n)"),
- thread_id (worker)));
- delete worker;
- }
- while (iter.advance ());
-
- shutdown_ = 1;
-
- return 0;
-}
-
-ACE_thread_t
-Manager::thread_id (Worker *worker)
-{
- return worker->thread_id ();
-}
-
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- Manager tp;
- tp.activate ();
-
- // Wait for a moment every time you send a message.
- ACE_Time_Value tv;
- tv.msec (100);
-
- ACE_Message_Block *mb;
- for (int i = 0; i < 30; i++)
- {
- ACE_NEW_RETURN
- (mb, ACE_Message_Block(sizeof(int)), -1);
-
- ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
-
- ACE_OS::sleep (tv);
-
- // Add a new work item.
- tp.putq (mb);
- }
-
- ACE_Thread_Manager::instance ()->wait ();
- return 0;
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Condition<ACE_Thread_Mutex>;
-template class ACE_Node<Worker*>;
-template class ACE_Unbounded_Queue<Worker*>;
-template class ACE_Unbounded_Queue_Iterator<Worker*>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Condition<ACE_Thread_Mutex>
-#pragma instantiate ACE_Node<Worker*>
-#pragma instantiate ACE_Unbounded_Queue<Worker*>
-#pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
-#else
-#include "ace/OS_main.h"
-#include "ace/OS_NS_stdio.h"
-
-int ACE_TMAIN (int, ACE_TCHAR *[])
-{
- ACE_OS::puts (ACE_TEXT ("This example requires threads."));
- return 0;
-}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/APG/ThreadPools/threadpools.mpc b/examples/APG/ThreadPools/threadpools.mpc
deleted file mode 100644
index 1855f7b40a2..00000000000
--- a/examples/APG/ThreadPools/threadpools.mpc
+++ /dev/null
@@ -1,37 +0,0 @@
-// -*- MPC -*-
-// $Id$
-
-project(Futures) : aceexe {
- exename = Futures
- Source_Files {
- Futures.cpp
- }
-}
-
-project(LF ThreadPool) : aceexe {
- exename = LF_ThreadPool
- Source_Files {
- LF_ThreadPool.cpp
- }
-}
-
-project(Task ThreadPool) : aceexe {
- exename = Task_ThreadPool
- Source_Files {
- Task_ThreadPool.cpp
- }
-}
-
-project(ThreadPool) : aceexe {
- exename = ThreadPool
- Source_Files {
- ThreadPool.cpp
- }
-}
-
-project(TP Reactor) : aceexe {
- exename = TP_Reactor
- Source_Files {
- TP_Reactor.cpp
- }
-}
diff --git a/examples/APG/ThreadPools/threadpools.mwc b/examples/APG/ThreadPools/threadpools.mwc
deleted file mode 100644
index a4c2b0f8085..00000000000
--- a/examples/APG/ThreadPools/threadpools.mwc
+++ /dev/null
@@ -1,6 +0,0 @@
-// -*- MPC -*-
-// $Id$
-
-workspace {
- threadpools.mpc
-}