summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-08 22:07:49 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-08 22:07:49 +0000
commitf9165af824b8b17874d968f3e57fad576570eb37 (patch)
tree923fca319913deedf33f4f1b95dc25736f288493 /examples
parentce166cd8c8fff5eb5a7ba8ea3bbeb7cf5a788979 (diff)
downloadATCD-f9165af824b8b17874d968f3e57fad576570eb37.tar.gz
foo
Diffstat (limited to 'examples')
-rw-r--r--examples/Threads/Makefile80
-rw-r--r--examples/Threads/auto_event.cpp113
-rw-r--r--examples/Threads/barrier1.cpp84
-rw-r--r--examples/Threads/barrier2.cpp269
-rw-r--r--examples/Threads/cancel.cpp72
-rw-r--r--examples/Threads/future1.cpp420
-rw-r--r--examples/Threads/future2.cpp524
-rw-r--r--examples/Threads/manual_event.cpp108
-rw-r--r--examples/Threads/process_mutex.cpp68
-rw-r--r--examples/Threads/process_semaphore.cpp56
-rw-r--r--examples/Threads/reader_writer.cpp187
-rw-r--r--examples/Threads/recursive_mutex.cpp108
-rw-r--r--examples/Threads/task_four.cpp248
-rw-r--r--examples/Threads/task_one.cpp104
-rw-r--r--examples/Threads/task_three.cpp230
-rw-r--r--examples/Threads/task_two.cpp156
-rw-r--r--examples/Threads/thread_manager.cpp104
-rw-r--r--examples/Threads/thread_pool.cpp214
-rw-r--r--examples/Threads/thread_specific.cpp219
-rw-r--r--examples/Threads/token.cpp76
-rw-r--r--examples/Threads/tss1.cpp164
-rw-r--r--examples/Threads/tss2.cpp252
22 files changed, 3816 insertions, 40 deletions
diff --git a/examples/Threads/Makefile b/examples/Threads/Makefile
index 98019d0accd..3ac8e6ac171 100644
--- a/examples/Threads/Makefile
+++ b/examples/Threads/Makefile
@@ -8,26 +8,26 @@
# Local macros
#----------------------------------------------------------------------------
-BIN = test_auto_event \
- test_barrier1 \
- test_barrier2 \
- test_future1 \
- test_future2 \
- test_manual_event \
- test_process_mutex \
- test_process_semaphore \
- test_reader_writer \
- test_recursive_mutex \
- test_task_one \
- test_task_two \
- test_task_three \
- test_task_four \
- test_thread_manager \
- test_thread_pool \
- test_thread_specific \
- test_tss1 \
- test_tss2 \
- test_token
+BIN = auto_event \
+ barrier1 \
+ barrier2 \
+ future1 \
+ future2 \
+ manual_event \
+ process_mutex \
+ process_semaphore \
+ reader_writer \
+ recursive_mutex \
+ task_one \
+ task_two \
+ task_three \
+ task_four \
+ thread_manager \
+ thread_pool \
+ thread_specific \
+ tss1 \
+ tss2 \
+ token
LSRC = $(addsuffix .cpp,$(BIN))
VLDLIBS = $(LDLIBS:%=%$(VAR))
@@ -57,7 +57,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
# DO NOT DELETE THIS LINE -- g++dep uses it.
# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
-.obj/test_auto_event.o .shobj/test_auto_event.so: test_auto_event.cpp \
+.obj/auto_event.o .shobj/auto_event.so: auto_event.cpp \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -110,7 +110,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Singleton.h
-.obj/test_barrier1.o .shobj/test_barrier1.so: test_barrier1.cpp \
+.obj/barrier1.o .shobj/barrier1.so: barrier1.cpp \
$(WRAPPER_ROOT)/ace/Synch.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -162,7 +162,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_barrier2.o .shobj/test_barrier2.so: test_barrier2.cpp \
+.obj/barrier2.o .shobj/barrier2.so: barrier2.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -218,7 +218,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_future1.o .shobj/test_future1.so: test_future1.cpp \
+.obj/future1.o .shobj/future1.so: future1.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -256,7 +256,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Method_Object.h \
$(WRAPPER_ROOT)/ace/Activation_Queue.h \
$(WRAPPER_ROOT)/ace/Auto_Ptr.h
-.obj/test_future2.o .shobj/test_future2.so: test_future2.cpp \
+.obj/future2.o .shobj/future2.so: future2.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -294,7 +294,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Method_Object.h \
$(WRAPPER_ROOT)/ace/Activation_Queue.h \
$(WRAPPER_ROOT)/ace/Auto_Ptr.h
-.obj/test_manual_event.o .shobj/test_manual_event.so: test_manual_event.cpp \
+.obj/manual_event.o .shobj/manual_event.so: manual_event.cpp \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -346,7 +346,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_process_mutex.o .shobj/test_process_mutex.so: test_process_mutex.cpp \
+.obj/process_mutex.o .shobj/process_mutex.so: process_mutex.cpp \
$(WRAPPER_ROOT)/ace/Synch.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -367,7 +367,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Event_Handler.h \
$(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Set.h
-.obj/test_process_semaphore.o .shobj/test_process_semaphore.so: test_process_semaphore.cpp \
+.obj/process_semaphore.o .shobj/process_semaphore.so: process_semaphore.cpp \
$(WRAPPER_ROOT)/ace/Synch.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -388,7 +388,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Event_Handler.h \
$(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Set.h
-.obj/test_reader_writer.o .shobj/test_reader_writer.so: test_reader_writer.cpp \
+.obj/reader_writer.o .shobj/reader_writer.so: reader_writer.cpp \
$(WRAPPER_ROOT)/ace/Synch.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -410,7 +410,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Thread.h \
$(WRAPPER_ROOT)/ace/Thread_Manager.h \
$(WRAPPER_ROOT)/ace/Get_Opt.h
-.obj/test_recursive_mutex.o .shobj/test_recursive_mutex.so: test_recursive_mutex.cpp \
+.obj/recursive_mutex.o .shobj/recursive_mutex.so: recursive_mutex.cpp \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -463,7 +463,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Get_Opt.h
-.obj/test_task_one.o .shobj/test_task_one.so: test_task_one.cpp \
+.obj/task_one.o .shobj/task_one.so: task_one.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -519,7 +519,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_task_two.o .shobj/test_task_two.so: test_task_two.cpp \
+.obj/task_two.o .shobj/task_two.so: task_two.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -575,7 +575,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_task_three.o .shobj/test_task_three.so: test_task_three.cpp \
+.obj/task_three.o .shobj/task_three.so: task_three.cpp \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
$(WRAPPER_ROOT)/ace/ACE.h \
@@ -631,7 +631,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Task_T.h \
$(WRAPPER_ROOT)/ace/Message_Queue.h \
$(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h
-.obj/test_task_four.o .shobj/test_task_four.so: test_task_four.cpp \
+.obj/task_four.o .shobj/task_four.so: task_four.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -687,7 +687,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_thread_manager.o .shobj/test_thread_manager.so: test_thread_manager.cpp \
+.obj/thread_manager.o .shobj/thread_manager.so: thread_manager.cpp \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -739,7 +739,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_thread_pool.o .shobj/test_thread_pool.so: test_thread_pool.cpp \
+.obj/thread_pool.o .shobj/thread_pool.so: thread_pool.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -795,7 +795,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_thread_specific.o .shobj/test_thread_specific.so: test_thread_specific.cpp \
+.obj/thread_specific.o .shobj/thread_specific.so: thread_specific.cpp \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -847,7 +847,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
-.obj/test_tss1.o .shobj/test_tss1.so: test_tss1.cpp \
+.obj/tss1.o .shobj/tss1.so: tss1.cpp \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -903,7 +903,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Task_T.h \
$(WRAPPER_ROOT)/ace/Message_Queue.h \
$(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h
-.obj/test_tss2.o .shobj/test_tss2.so: test_tss2.cpp \
+.obj/tss2.o .shobj/tss2.so: tss2.cpp \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -938,7 +938,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Mem_Map.h \
$(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
$(WRAPPER_ROOT)/ace/Token.h
-.obj/test_token.o .shobj/test_token.so: test_token.cpp \
+.obj/token.o .shobj/token.so: token.cpp \
$(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Synch.h \
$(WRAPPER_ROOT)/ace/ACE.h \
diff --git a/examples/Threads/auto_event.cpp b/examples/Threads/auto_event.cpp
new file mode 100644
index 00000000000..4f83d50db71
--- /dev/null
+++ b/examples/Threads/auto_event.cpp
@@ -0,0 +1,113 @@
+// $Id$
+
+// This test shows the use of an ACE_Auto_Event as a signaling
+// mechanism. Two threads are created (one a reader, the other a
+// writer). The reader waits till the writer has completed
+// calculations. Upon waking up the reader prints the data calculated
+// by the writer. The writer thread calculates the value and signals
+// the reader when the calculation completes.
+
+#include "ace/Service_Config.h"
+#include "ace/Synch.h"
+#include "ace/Singleton.h"
+#include "ace/Thread_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+// Shared event between reader and writer. The ACE_Thread_Mutex is
+// necessary to make sure that only one ACE_Auto_Event is created.
+// The default constructor for ACE_Auto_Event sets it initially into
+// the non-signaled state.
+
+typedef ACE_Singleton <ACE_Auto_Event, ACE_Thread_Mutex> EVENT;
+
+// work time for writer
+static int work_time;
+
+// Reader thread.
+static void *
+reader (void *arg)
+{
+ // Shared data via a reference.
+ int& data = *(int *) arg;
+
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+
+ // Wait for writer to complete.
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) reader: waiting...... \n"));
+
+ if (EVENT::instance ()->wait () == -1)
+ {
+ ACE_ERROR ((LM_ERROR, "thread wait failed"));
+ ACE_OS::exit (0);
+ }
+
+ // Read shared data.
+ ACE_DEBUG ((LM_DEBUG, "(%t) reader: value of data is: %d \n", data));
+
+ return 0;
+}
+
+// Writer thread.
+static void *
+writer (void *arg)
+{
+ int& data = *(int *) arg;
+
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+
+ // Calculate (work).
+ ACE_DEBUG ((LM_DEBUG, "(%t) writer: working for %d secs\n", work_time));
+ ACE_OS::sleep (work_time);
+
+ // Write shared data.
+ data = 42;
+
+ // Wake up reader.
+ ACE_DEBUG ((LM_DEBUG, "(%t) writer: calculation complete, waking reader\n"));
+
+ if (EVENT::instance ()->signal () == -1)
+ {
+ ACE_ERROR ((LM_ERROR, "thread wait failed"));
+ ACE_OS::exit (0);
+ }
+
+ return 0;
+}
+
+int
+main (int argc, char **argv)
+{
+ // Shared data: set by writer, read by reader.
+ int data;
+
+ // Work time for writer.
+ work_time = argc == 2 ? atoi (argv[1]) : 5;
+
+ // threads manager
+ ACE_Thread_Manager& tm = *ACE_Service_Config::thr_mgr ();
+
+ // Create reader thread.
+ if (tm.spawn (reader, (void *) &data) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "thread create for reader failed"), -1);
+
+ // Create writer thread.
+ if (tm.spawn (writer, (void *) &data) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "thread create for writer failed"), -1);
+
+ // Wait for both.
+ if (tm.wait () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "thread wait failed"), -1);
+ else
+ ACE_DEBUG ((LM_ERROR, "graceful exit\n"));
+
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/barrier1.cpp b/examples/Threads/barrier1.cpp
new file mode 100644
index 00000000000..6b213819ca0
--- /dev/null
+++ b/examples/Threads/barrier1.cpp
@@ -0,0 +1,84 @@
+// This test program illustrates how the ACE barrier synchronization
+// $Id$
+
+// mechanisms work.
+
+
+#include "ace/Synch.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Service_Config.h"
+
+#if defined (ACE_HAS_THREADS)
+
+struct Tester_Args
+ // = TITLE
+ // These arguments are passed into each test thread.
+{
+ Tester_Args (ACE_Barrier &tb, int i)
+ : tester_barrier_ (tb),
+ n_iterations_ (i) {}
+
+ ACE_Barrier &tester_barrier_;
+ // Reference to the tester barrier. This controls each miteration of
+ // the tester function running in every thread.
+
+ int n_iterations_;
+ // Number of iterations to run.
+};
+
+// Iterate <n_iterations> time printing off a message and "waiting"
+// for all other threads to complete this iteration.
+
+static void *
+tester (Tester_Args *args)
+{
+ // Keeps track of thread exit.
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+
+ for (int iterations = 1;
+ iterations <= args->n_iterations_;
+ iterations++)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d\n", iterations));
+
+ // Block until all other threads have waited, then continue.
+ args->tester_barrier_.wait ();
+ }
+
+ return 0;
+}
+
+// Default number of threads to spawn.
+static const int DEFAULT_ITERATIONS = 5;
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon (argv[0]);
+
+ int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
+ int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS;
+
+ ACE_Barrier tester_barrier (n_threads);
+
+ Tester_Args args (tester_barrier, n_iterations);
+
+ if (ACE_Service_Config::thr_mgr ()->spawn_n
+ (n_threads, ACE_THR_FUNC (tester),
+ (void *) &args, THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1);
+
+ // Wait for all the threads to reach their exit point.
+ ACE_Service_Config::thr_mgr ()->wait ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) done\n"));
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp
new file mode 100644
index 00000000000..30190ace443
--- /dev/null
+++ b/examples/Threads/barrier2.cpp
@@ -0,0 +1,269 @@
+// $Id$
+
+// generic_worker_task.cpp
+//
+// This test program illustrates how the ACE task workers/barrier
+// synchronization mechanisms work in conjunction with the ACE_Task
+// and the ACE_Thread_Manager. The manual flag not set simulates
+// user input, if set input comes from stdin until RETURN only is
+// entered which stops all workers via a message block of length
+// 0. This is an alernative shutdown of workers compared to queue
+// deactivate. The delay_put flag simulates a delay between the
+// shutdown puts. All should work with this flag disabled! The
+// BARRIER_TYPE is supposed to enable/disable barrier sync on each svc
+// a worker has done.
+
+#include <iostream.h>
+#include "ace/Task.h"
+#include "ace/Service_Config.h"
+
+#if defined (ACE_HAS_THREADS)
+
+#define BARRIER_TYPE ACE_Null_Barrier
+//#define BARRIER_TYPE ACE_Barrier
+//#ifdef delay_put
+//#define manual
+
+template <class BARRIER>
+class Worker_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+
+ Worker_Task (ACE_Thread_Manager *thr_mgr,
+ int n_threads,
+ int inp_serialize = 1);
+
+ virtual int Producer (void);
+ // produce input for workers
+
+ virtual int input (ACE_Message_Block *mb);
+ // Fill one message block via a certain input strategy.
+
+ virtual int output (ACE_Message_Block *mb);
+ // Forward one message block via a certain output strategy to the
+ // next task if any.
+
+ virtual int service (ACE_Message_Block *mb, int iter);
+ // Perform one message block dependant service.
+
+private:
+ virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0);
+
+ virtual int svc (void);
+ // Iterate <n_iterations> time printing off a message and "waiting"
+ // for all other threads to complete this iteration.
+
+ // = Not needed for this test.
+ virtual int open (void *) { return 0; }
+ virtual int close (u_long) {ACE_DEBUG ((LM_DEBUG,"(%t) in close of worker\n")); return 0; }
+
+ int nt_;
+ // Number of worker threads to run.
+ int inp_serialize_;
+
+ BARRIER barrier_;
+};
+
+template <class BARRIER>
+Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr,
+ int n_threads,
+ int inp_serialize)
+ : ACE_Task<ACE_MT_SYNCH> (thr_mgr),
+ barrier_ (n_threads)
+{
+ nt_ = n_threads;
+ // Create worker threads.
+ inp_serialize_ = inp_serialize;
+
+ // Use the task's message queue for serialization (default) or run
+ // service in the context of the caller thread.
+
+ if (nt_ > 0 && inp_serialize == 1)
+ if (this->activate (THR_NEW_LWP, n_threads) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
+}
+
+// Simply enqueue the Message_Block into the end of the queue.
+
+template <class BARRIER> int
+Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+{
+ int result;
+ if (this->inp_serialize_)
+ result = this->putq (mb, tv);
+ else
+ {
+ static int iter = 0;
+ result = this->service (mb, iter++);
+
+ if (this->output (mb) < 0)
+ ACE_DEBUG ((LM_DEBUG, "(%t) output not connected!\n"));
+
+ delete mb;
+ }
+ return result;
+}
+
+template <class BARRIER> int
+Worker_Task<BARRIER>::service (ACE_Message_Block *mb, int iter)
+{
+ int length = mb->length ();
+
+ if (length > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%t) in iteration %d len=%d text got:\n",iter,length));
+ ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
+ ACE_DEBUG ((LM_DEBUG,"\n"));
+ }
+ return 0;
+}
+
+// Iterate <n_iterations> time printing off a message and "waiting"
+// for all other threads to complete this iteration.
+
+template <class BARRIER> int
+Worker_Task<BARRIER>::svc (void)
+{
+ // Note that the ACE_Task::svc_run () method automatically adds us
+ // to the Thread_Manager when the thread begins.
+
+ // Keep looping, reading a message out of the queue, until we get a
+ // message with a length == 0, which signals us to quit.
+
+ for (int iter = 1; ;iter++)
+ {
+ ACE_Message_Block *mb = 0;
+
+ int result = this->getq (mb);
+
+ if (result == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%t) in iteration %d\n", "error waiting for message in iteration", iter));
+ break;
+ }
+
+ int length = mb->length ();
+ this->service (mb,iter);
+
+ if (length == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d got quit, exit!\n", iter));
+ delete mb;
+ break;
+ }
+
+ this->barrier_.wait ();
+ this->output (mb);
+
+ delete mb;
+ }
+
+ // Note that the ACE_Task::svc_run () method automatically removes
+ // us from the Thread_Manager when the thread exits.
+
+ return 0;
+}
+
+template <class BARRIER> int
+Worker_Task<BARRIER>::Producer (void)
+{
+ // Keep reading stdin, until we reach EOF.
+
+ for (;;)
+ {
+ // Allocate a new message.
+ ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
+
+ if (this->input (mb) == -1)
+ return -1;
+ }
+
+ return 0;
+}
+
+template <class BARRIER>int
+Worker_Task<BARRIER>::output (ACE_Message_Block *mb)
+{
+ return this->put_next (mb);
+}
+
+template <class BARRIER>int
+Worker_Task<BARRIER>::input (ACE_Message_Block *mb)
+{
+ ACE_Message_Block *mb1;
+
+#ifndef manual
+ static int l= 0;
+ char str[]="kalle";
+ strcpy (mb->rd_ptr (),str);
+ int n=strlen (str);
+ if (l==1000)
+ n=1;
+ l++;
+ if (l==0 || (l%100 == 0)) ACE_OS::sleep (5);
+ if (n <= 1)
+#else
+ ACE_DEBUG ((LM_DEBUG,"(%t) press chars and enter to put a new message into task queue ...\n"));
+ if ((n = read (0, mb->rd_ptr (), mb->size ())) <= 1)
+#endif // manual
+ {
+ // Send a shutdown message to the waiting threads and exit.
+ // cout << "\nvor loop, dump of task msg queue:\n" << endl;
+ // this->msg_queue ()->dump ();
+ for (int i=0;i<nt_;i++)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%t) eof, sending block for thread=%d\n",i+1));
+ mb1 = new ACE_Message_Block (2);
+ mb1->length (0);
+ if (this->put (mb1) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put"));
+#ifdef delay_put
+ ACE_OS::sleep (1); // this sleep helps to shutdown correctly -> was an error!
+#endif /* delay_put */
+ }
+ // cout << "\nnach loop, dump of task msg queue:\n" << endl;
+ // this->msg_queue ()->dump ();
+ return (-1);
+ }
+ else
+ {
+ // Send a normal message to the waiting threads and continue producing.
+ mb->wr_ptr (n);
+ if (this->put (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put"));
+ }
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
+
+ ACE_DEBUG ((LM_DEBUG,"(%t) worker threads running=%d\n",n_threads));
+
+
+ Worker_Task<BARRIER_TYPE> *worker_task =
+ new Worker_Task<BARRIER_TYPE> (ACE_Service_Config::thr_mgr (),
+ /*n_threads*/ 0,0);
+
+ worker_task->Producer ();
+
+ // Wait for all the threads to reach their exit point.
+ ACE_DEBUG ((LM_DEBUG,"(%t) waiting with thread manager ...\n"));
+ ACE_Service_Config::thr_mgr ()->wait ();
+ ACE_DEBUG ((LM_DEBUG,"(%t) delete worker task ...\n"));
+
+ delete worker_task;
+ ACE_DEBUG ((LM_DEBUG,"(%t) done correct!\n"));
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/cancel.cpp b/examples/Threads/cancel.cpp
new file mode 100644
index 00000000000..a9d12bea579
--- /dev/null
+++ b/examples/Threads/cancel.cpp
@@ -0,0 +1,72 @@
+// Test out the cooperative thread cancellation mechanisms provided by
+// $Id$
+
+// the ACE_Thread_Manager.
+
+#include "ace/Service_Config.h"
+#include "ace/Thread_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+
+static void *
+worker (int iterations)
+{
+ for (int i = 0; i < iterations; i++)
+ {
+ if ((i % 10) == 0
+ && (ACE_Service_Config::thr_mgr ()->testcancel (ACE_Thread::self ()) != 0))
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) has been cancelled before iteration!\n", i));
+ break;
+ }
+ }
+
+ return 0;
+}
+
+static const int DEFAULT_THREADS = ACE_DEFAULT_THREADS;
+static const int DEFAULT_ITERATIONS = 100000;
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon;
+
+ daemon.open (argv[0]);
+
+ int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_THREADS;
+ int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS;
+
+ ACE_Thread_Manager *thr_mgr = ACE_Service_Config::thr_mgr ();
+
+ int grp_id = thr_mgr->spawn_n (n_threads, ACE_THR_FUNC (worker),
+ (void *) n_iterations,
+ THR_NEW_LWP | THR_DETACHED);
+
+ // Wait for 2 seconds and then suspend every thread in the group.
+ ACE_OS::sleep (2);
+ thr_mgr->suspend_grp (grp_id);
+
+ // Wait for 2 more seconds and then resume every thread in the
+ // group.
+ ACE_OS::sleep (ACE_Time_Value (2));
+ thr_mgr->resume_grp (grp_id);
+
+ // Wait for 2 more seconds and then send a SIGINT to every thread in
+ // the group.
+ ACE_OS::sleep (ACE_Time_Value (2));
+ thr_mgr->kill_grp (grp_id, SIGINT);
+
+ // Wait for 2 more seconds and then exit (which should kill all the
+ // threads)!
+ ACE_OS::sleep (ACE_Time_Value (2));
+
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "threads not supported on this platform\n"), -1);
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/future1.cpp b/examples/Threads/future1.cpp
new file mode 100644
index 00000000000..ea295e487e1
--- /dev/null
+++ b/examples/Threads/future1.cpp
@@ -0,0 +1,420 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// Test_Future.cpp
+//
+// = DESCRIPTION
+// This example tests the ACE Future.
+//
+// = AUTHOR
+// Andres Kruse <Andres.Kruse@cern.ch> and Douglas C. Schmidt
+// <schmidt@cs.wustl.edu>
+//
+// ============================================================================
+
+#include <math.h>
+#include "ace/Task.h"
+
+#include "ace/Synch.h"
+#include "ace/Message_Queue.h"
+#include "ace/Future.h"
+#include "ace/Method_Object.h"
+#include "ace/Activation_Queue.h"
+#include "ace/Auto_Ptr.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Atomic_Op<ACE_Thread_Mutex, u_long> ATOMIC_INT;
+
+// a counter for the tasks..
+static ATOMIC_INT task_count (0);
+
+// a counter for the futures..
+static ATOMIC_INT future_count (0);
+static ATOMIC_INT future_no (0);
+
+// a counter for the capsules..
+static ATOMIC_INT capsule_count (0);
+static ATOMIC_INT capsule_no (0);
+
+// a counter for the method objects...
+static ATOMIC_INT methodobject_count (0);
+static ATOMIC_INT methodobject_no (0);
+
+class Scheduler : public ACE_Task<ACE_MT_SYNCH>
+ // = TITLE
+ // Active Object Scheduler.
+{
+ friend class Method_ObjectWork;
+public:
+ Scheduler (const char *, Scheduler * = 0);
+ ~Scheduler (void);
+
+ virtual int open (void *args = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
+ virtual int svc (void);
+
+ ACE_Future<double> work (double param, int count);
+ ACE_Future<const char*> name (void);
+ void end (void);
+
+ double work_i (double, int);
+ const char *name_i (void);
+
+private:
+ char *name_;
+ ACE_Activation_Queue activation_queue_;
+ Scheduler *scheduler_;
+
+};
+
+class Method_Object_work : public ACE_Method_Object
+ // = TITLE
+ // Reification of the <work> method.
+{
+public:
+ Method_Object_work (Scheduler *, double, int, ACE_Future<double> &);
+ ~Method_Object_work (void);
+ virtual int call (void);
+
+private:
+ Scheduler *scheduler_;
+ double param_;
+ int count_;
+ ACE_Future<double> future_result_;
+};
+
+Method_Object_work::Method_Object_work (Scheduler* new_Scheduler,
+ double new_param,
+ int new_count,
+ ACE_Future<double> &new_result)
+ : scheduler_ (new_Scheduler),
+ param_ (new_param),
+ count_ (new_count),
+ future_result_ (new_result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Method_Object_work created\n"));
+}
+
+Method_Object_work::~Method_Object_work (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Method_Object_work will be deleted.\n"));
+}
+
+
+int
+Method_Object_work::call (void)
+{
+ return this->future_result_.set (this->scheduler_->work_i (this->param_, this->count_));
+}
+
+class Method_Object_name : public ACE_Method_Object
+ // = TITLE
+ // Reification of the <name> method.
+{
+public:
+ Method_Object_name (Scheduler *, ACE_Future<const char*> &);
+ ~Method_Object_name (void);
+ virtual int call (void);
+
+private:
+ Scheduler *scheduler_;
+ ACE_Future<const char*> future_result_;
+};
+
+Method_Object_name::Method_Object_name (Scheduler *new_scheduler,
+ ACE_Future<const char*> &new_result)
+ : scheduler_ (new_scheduler),
+ future_result_ (new_result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Method_Object_name created\n"));
+};
+
+Method_Object_name::~Method_Object_name (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Method_Object_name will be deleted.\n"));
+}
+
+int
+Method_Object_name::call (void)
+{
+ return future_result_.set (scheduler_->name_i ());
+}
+
+class Method_Object_end : public ACE_Method_Object
+ // = TITLE
+ // Reification of the <end> method.
+{
+public:
+ Method_Object_end (Scheduler *new_Scheduler): scheduler_ (new_Scheduler) {}
+ ~Method_Object_end (void) {}
+ virtual int call (void) { this->scheduler_->close (); return -1; }
+
+private:
+ Scheduler *scheduler_;
+};
+
+// constructor
+Scheduler::Scheduler (const char *newname, Scheduler *new_Scheduler)
+{
+ ACE_NEW (this->name_, char[ACE_OS::strlen (newname) + 1]);
+ ACE_OS::strcpy ((char *) this->name_, newname);
+ this->scheduler_ = new_Scheduler;
+ ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s created\n", this->name_));
+}
+
+// Destructor
+Scheduler::~Scheduler (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s will be destroyed\n", this->name_));
+}
+
+// open
+int
+Scheduler::open (void *)
+{
+ task_count++;
+ ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s open\n", this->name_));
+ return this->activate (THR_BOUND);
+}
+
+// close
+int
+Scheduler::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s close\n", this->name_));
+ task_count--;
+ return 0;
+}
+
+// put... ??
+int
+Scheduler::put (ACE_Message_Block *, ACE_Time_Value *)
+{
+ return 0;
+}
+
+// service..
+int
+Scheduler::svc (void)
+{
+ for (;;)
+ {
+ // Dequeue the next method object (we use an auto pointer in
+ // case an exception is thrown in the <call>).
+ auto_ptr<ACE_Method_Object> mo (this->activation_queue_.dequeue ());
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) calling method object\n"));
+ // Call it.
+ if (mo->call () == -1)
+ break;
+ // Destructor automatically deletes it.
+ }
+
+ /* NOTREACHED */
+ return 0;
+}
+
+void
+Scheduler::end (void)
+{
+ this->activation_queue_.enqueue (new Method_Object_end (this));
+}
+
+
+// Here's where the Work takes place.
+double
+Scheduler::work_i (double param,
+ int count)
+{
+ double x = 0.0, y = 0.0;
+
+ // @@ We should probably do something fun here, like compute the
+ // Fibonacci sequence or something.
+
+ for (int j = 0; j < count; j++)
+ {
+ x = x + param;
+ y = y + double(::sin (x));
+ }
+
+ return y;
+}
+
+const char *
+Scheduler::name_i (void)
+{
+ char *the_name;
+
+ the_name = new char[ACE_OS::strlen (this->name_) + 1];
+ ACE_OS::strcpy (the_name, this->name_);
+
+ return the_name;
+}
+
+ACE_Future<const char *>
+Scheduler::name (void)
+{
+ if (this->scheduler_)
+ // Delegate to the Scheduler.
+ return this->scheduler_->name ();
+ else
+ {
+ ACE_Future<const char*> new_future;
+
+ // @@ What happens if new fails here?
+ this->activation_queue_.enqueue
+ (new Method_Object_name (this, new_future));
+
+ return new_future;
+ }
+}
+
+ACE_Future<double>
+Scheduler::work (double newparam, int newcount)
+{
+ if (this->scheduler_) {
+ return this->scheduler_->work (newparam, newcount);
+ }
+ else {
+ ACE_Future<double> new_future;
+
+ this->activation_queue_.enqueue
+ (new Method_Object_work (this, newparam, newcount, new_future));
+ return new_future;
+ }
+}
+
+// @@ These values should be set by the command line options!
+
+// Total number of iterations to <work>
+static int n_iterations = 50000;
+
+// Total number of loops.
+static int n_loops = 100;
+
+int
+main (int, char *[])
+{
+ Scheduler *andres, *peter, *helmut, *matias;
+
+ // Create active objects..
+ // @@ Should "open" be subsumed within the constructor of
+ // Scheduler()?
+ andres = new Scheduler ("andres");
+ andres->open ();
+ peter = new Scheduler ("peter");
+ peter->open ();
+ helmut = new Scheduler ("helmut");
+ helmut->open ();
+
+ // Matias passes all asynchronous method calls on to Andres...
+ matias = new Scheduler ("matias", andres);
+ matias->open ();
+
+ for (int i = 0; i < n_loops; i++)
+ {
+ {
+ ACE_Future<double> fresulta, fresultb, fresultc, fresultd, fresulte;
+ ACE_Future<const char*> fname;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) going to do a non-blocking call\n"));
+
+ fresulta = andres->work (0.01, 100 + (n_iterations * (i % 2)));
+ fresultb = peter->work (0.01, 100 + (n_iterations * (i % 2)));
+ fresultc = helmut->work (0.01, 100 + (n_iterations * (i % 2)));
+ fresultd = matias->work (0.02, 100 + (n_iterations * (i % 2)));
+ fname = andres->name ();
+
+ // see if the result is available...
+ if (fresulta.ready ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) wow.. work is ready.....\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) non-blocking call done... now blocking...\n"));
+
+ // Save the result of fresulta.
+
+ fresulte = fresulta;
+
+ if (i % 3 == 0)
+ {
+ // Every 3rd time... disconnect the futures...
+ // but "fresulte" should still contain the result...
+ fresulta.cancel (10.0);
+ fresultb.cancel (20.0);
+ fresultc.cancel (30.0);
+ fresultd.cancel (40.0);
+ }
+
+ double resulta = 0, resultb = 0, resultc = 0, resultd = 0, resulte = 0;
+
+ fresulta.get (resulta);
+ fresultb.get (resultb);
+ fresultc.get (resultc);
+ fresultd.get (resultd);
+ fresulte.get (resulte);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) result a %f\n", resulte));
+ ACE_DEBUG ((LM_DEBUG, "(%t) result b %f\n", resulta));
+ ACE_DEBUG ((LM_DEBUG, "(%t) result c %f\n", resultb));
+ ACE_DEBUG ((LM_DEBUG, "(%t) result d %f\n", resultc));
+ ACE_DEBUG ((LM_DEBUG, "(%t) result e %f\n", resultd));
+
+ const char *name;
+
+ fname.get (name);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) name %s\n", name));
+
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n",
+ (u_long) task_count,
+ (u_long) future_count,
+ (u_long) capsule_count,
+ (u_long) methodobject_count));
+ }
+
+ // Close things down.
+ andres->end ();
+ peter->end ();
+ helmut->end ();
+ matias->end ();
+
+ ACE_OS::sleep (2);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n",
+ (u_long) task_count,
+ (u_long) future_count,
+ (u_long) capsule_count,
+ (u_long) methodobject_count));
+
+ ACE_DEBUG ((LM_DEBUG,"(%t) th' that's all folks!\n"));
+
+ ACE_OS::sleep (5);
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
+
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/future2.cpp b/examples/Threads/future2.cpp
new file mode 100644
index 00000000000..55ce8c05a40
--- /dev/null
+++ b/examples/Threads/future2.cpp
@@ -0,0 +1,524 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// Test_Future.cpp
+//
+// = DESCRIPTION
+// This example tests the ACE Future.
+//
+// = AUTHOR
+// Andres Kruse <Andres.Kruse@cern.ch> and Douglas C. Schmidt
+// <schmidt@cs.wustl.edu>
+//
+// Modification History
+// Aug. 96; A.Kruse; dev.
+// Aug. 96; D.Schmidt; complete workover
+// 08/27/96; A.Kruse; - the friends of Scheduler are "Method_Object_name"
+// and "Method_Object_work".
+// - make the methods "work_i" and "name_i" private
+// 09/2/96; D.Schmidt; Integrate with new ACE_Future API and rearrange
+// the tests so they are more modular.
+// ============================================================================
+
+#include <math.h>
+#include "ace/Task.h"
+
+#include "ace/Synch.h"
+#include "ace/Message_Queue.h"
+#include "ace/Future.h"
+#include "ace/Method_Object.h"
+#include "ace/Activation_Queue.h"
+#include "ace/Auto_Ptr.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Atomic_Op<ACE_Thread_Mutex, u_long> ATOMIC_INT;
+
+// a counter for the tasks..
+static ATOMIC_INT scheduler_open_count (0);
+
+// forward declarations
+class Method_Object_work;
+class Method_Object_name;
+
+class Scheduler : public ACE_Task<ACE_MT_SYNCH>
+ // = TITLE
+ // Active Object Scheduler.
+{
+ // Every method object has to be able to access the private methods.
+
+ friend class Method_Object_work;
+ friend class Method_Object_name;
+ friend class Method_Object_end;
+public:
+
+ Scheduler (const char *, Scheduler * = 0);
+ ~Scheduler (void);
+
+ virtual int open (void *args = 0);
+ // The method that is used to start the active object.
+
+ // = Here are the methods exported by the class. They return an
+ // <ACE_Future>.
+ ACE_Future<double> work (double param, int count);
+ ACE_Future<char*> name (void);
+ void end (void);
+
+private:
+ virtual int close (u_long flags = 0);
+ // Should not be accessible from outside... (use end () instead).
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0)
+ { return 0; };
+ // Doesn't have any use for this example.
+
+ virtual int svc (void);
+ // Here the actual servicing of all requests is happening..
+
+ // = Implementation methods.
+ double work_i (double, int);
+ char *name_i (void);
+
+ char *name_;
+ ACE_Activation_Queue activation_queue_;
+ Scheduler *scheduler_;
+};
+
+class Method_Object_work : public ACE_Method_Object
+ // = TITLE
+ // Reification of the <work> method.
+{
+public:
+ Method_Object_work (Scheduler *, double, int, ACE_Future<double> &);
+ ~Method_Object_work (void);
+ virtual int call (void);
+
+private:
+ Scheduler *scheduler_;
+ double param_;
+ int count_;
+ ACE_Future<double> future_result_;
+};
+
+Method_Object_work::Method_Object_work (Scheduler* new_Scheduler,
+ double new_param,
+ int new_count,
+ ACE_Future<double> &new_result)
+ : scheduler_ (new_Scheduler),
+ param_ (new_param),
+ count_ (new_count),
+ future_result_ (new_result)
+{
+}
+
+Method_Object_work::~Method_Object_work (void)
+{
+}
+
+int
+Method_Object_work::call (void)
+{
+ return this->future_result_.set (this->scheduler_->work_i (this->param_, this->count_));
+}
+
+class Method_Object_name : public ACE_Method_Object
+ // = TITLE
+ // Reification of the <name> method.
+{
+public:
+ Method_Object_name (Scheduler *, ACE_Future<char*> &);
+ ~Method_Object_name (void);
+ virtual int call (void);
+
+private:
+ Scheduler *scheduler_;
+ ACE_Future<char*> future_result_;
+};
+
+
+Method_Object_name::Method_Object_name (Scheduler *new_scheduler,
+ ACE_Future<char*> &new_result)
+ : scheduler_ (new_scheduler),
+ future_result_ (new_result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) Method_Object_name created\n"));
+};
+
+Method_Object_name::~Method_Object_name (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) Method_Object_name will be deleted.\n"));
+}
+
+int
+Method_Object_name::call (void)
+{
+ return future_result_.set (scheduler_->name_i ());
+}
+
+class Method_Object_end : public ACE_Method_Object
+ // = TITLE
+ // Reification of the <end> method.
+{
+public:
+ Method_Object_end (Scheduler *new_Scheduler): scheduler_ (new_Scheduler) {}
+ ~Method_Object_end (void) {}
+ virtual int call (void) { this->scheduler_->close (); return -1; }
+
+private:
+ Scheduler *scheduler_;
+};
+
+// constructor
+Scheduler::Scheduler (const char *newname, Scheduler *new_Scheduler)
+{
+ ACE_NEW (this->name_, char[ACE_OS::strlen (newname) + 1]);
+ ACE_OS::strcpy ((char *) this->name_, newname);
+ this->scheduler_ = new_Scheduler;
+ ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s created\n", this->name_));
+}
+
+// Destructor
+Scheduler::~Scheduler (void)
+{
+ ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s will be destroyed\n", this->name_));
+}
+
+int
+Scheduler::open (void *)
+{
+ scheduler_open_count++;
+ ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s open\n", this->name_));
+ return this->activate (THR_BOUND);
+}
+
+int
+Scheduler::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s close\n", this->name_));
+ scheduler_open_count--;
+ return 0;
+}
+
+int
+Scheduler::svc (void)
+{
+ // Main event loop for this active object.
+ for (;;)
+ {
+ // Dequeue the next method object (we use an auto pointer in
+ // case an exception is thrown in the <call>).
+ auto_ptr<ACE_Method_Object> mo (this->activation_queue_.dequeue ());
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) calling method object\n"));
+ // Call it.
+ if (mo->call () == -1)
+ break;
+ // Smart pointer destructor automatically deletes mo.
+ }
+
+ /* NOTREACHED */
+ return 0;
+}
+
+void
+Scheduler::end (void)
+{
+ this->activation_queue_.enqueue (new Method_Object_end (this));
+}
+
+// Here's where the Work takes place.
+double
+Scheduler::work_i (double param,
+ int count)
+{
+ double x = 0, y = 0;
+
+ for (int j = 0; j < count; j++)
+ {
+ x = x + param;
+ y = y + ::sin (x);
+ }
+
+ return y;
+}
+
+char *
+Scheduler::name_i (void)
+{
+ char *the_name;
+
+ the_name = new char[ACE_OS::strlen (this->name_) + 1];
+ ACE_OS::strcpy (the_name, this->name_);
+
+ return the_name;
+}
+
+ACE_Future<char *>
+Scheduler::name (void)
+{
+ if (this->scheduler_)
+ // Delegate to the other scheduler
+ return this->scheduler_->name ();
+ else
+ {
+ ACE_Future<char*> new_future;
+
+ if (this->thr_count () == 0)
+ {
+ // This scheduler is inactive... so we execute the user
+ // request right away...
+
+ auto_ptr<ACE_Method_Object> mo (new Method_Object_name (this, new_future));
+
+ mo->call ();
+ // Smart pointer destructor automatically deletes mo.
+ }
+ else
+ // @@ What happens if new fails here?
+ this->activation_queue_.enqueue
+ (new Method_Object_name (this, new_future));
+
+ return new_future;
+ }
+}
+
+ACE_Future<double>
+Scheduler::work (double newparam, int newcount)
+{
+ if (this->scheduler_)
+ return this->scheduler_->work (newparam, newcount);
+ else
+ {
+ ACE_Future<double> new_future;
+
+ if (this->thr_count () == 0)
+ {
+ auto_ptr<ACE_Method_Object> mo
+ (new Method_Object_work (this, newparam, newcount, new_future));
+ mo->call ();
+ // Smart pointer destructor automatically deletes it.
+ }
+ else
+ this->activation_queue_.enqueue
+ (new Method_Object_work (this, newparam, newcount, new_future));
+
+ return new_future;
+ }
+}
+
+static int
+determine_iterations (void)
+{
+ int n_iterations;
+
+ ACE_DEBUG ((LM_DEBUG," (%t) determining the number of iterations...\n"));
+ Scheduler *worker_a = new Scheduler ("worker A");
+
+ ACE_Time_Value tstart (ACE_OS::gettimeofday ());
+ ACE_Time_Value tend (ACE_OS::gettimeofday ());
+
+ // Determine the number of iterations... we want so many that the
+ // work () takes about 1 second...
+
+ for (n_iterations = 1;
+ (tend.sec () - tstart.sec ()) < 1;
+ n_iterations *= 2)
+ {
+ tstart = ACE_OS::gettimeofday ();
+
+ worker_a->work (0.1, n_iterations);
+
+ tend = ACE_OS::gettimeofday ();
+ }
+
+ ACE_DEBUG ((LM_DEBUG," (%t) n_iterations %d\n",
+ (u_long) n_iterations));
+
+ worker_a->end ();
+ // @@ Can we safely delete worker_a here?
+ return n_iterations;
+}
+
+static void
+test_active_object (int n_iterations)
+{
+ ACE_DEBUG ((LM_DEBUG," (%t) testing active object pattern...\n"));
+ // A simple example for the use of the active object pattern and
+ // futures to return values from an active object.
+
+ Scheduler *worker_a = new Scheduler ("worker A");
+ Scheduler *worker_b = new Scheduler ("worker B");
+
+ // Have worker_c delegate his work to worker_a.
+ Scheduler *worker_c = new Scheduler ("worker C", worker_a);
+
+ // loop 0:
+ // test the Schedulers when they are not active.
+ // now the method objects will be created but since
+ // there is no active thread they will also be
+ // immediately executed, in the "main" thread.
+ // loop 1:
+ // do the same test but with the schedulers
+ // activated
+ for (int i = 0; i < 2; i++)
+ {
+ if (i == 1)
+ {
+ worker_a->open ();
+ worker_b->open ();
+ worker_c->open ();
+ }
+
+ ACE_Future<double> fresulta = worker_a->work (0.01, n_iterations);
+ ACE_Future<double> fresultb = worker_b->work (0.02, n_iterations);
+ ACE_Future<double> fresultc = worker_c->work (0.03, n_iterations);
+
+ if (i == 0)
+ {
+ if (!fresulta.ready ())
+ ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker A is should be ready!!!\n"));
+ if (!fresultb.ready ())
+ ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker B is should be ready!!!\n"));
+ if (!fresultc.ready ())
+ ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker C is should be ready!!!\n"));
+ }
+
+ // When the workers are active we will block here until the
+ // results are available.
+
+ double resulta = fresulta;
+ double resultb = fresultb;
+ double resultc = fresultc;
+
+ ACE_Future<char *> fnamea = worker_a->name ();
+ ACE_Future<char *> fnameb = worker_b->name ();
+ ACE_Future<char *> fnamec = worker_c->name ();
+
+ char *namea = fnamea;
+ char *nameb = fnameb;
+ char *namec = fnamec;
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %f\n",
+ namea, resulta));
+ ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %f\n",
+ nameb, resultb));
+ ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %f\n",
+ namec, resultc));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d before end ()\n",
+ (u_long) scheduler_open_count));
+
+ worker_a->end ();
+ worker_b->end ();
+ worker_c->end ();
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d immediately after end ()\n",
+ (u_long) scheduler_open_count));
+
+ ACE_OS::sleep (2);
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d after waiting\n",
+ (u_long) scheduler_open_count));
+ // @@ Can we safely delete worker_a, worker_b, and worker_c?
+}
+
+static void
+test_cancellation (int n_iterations)
+{
+ ACE_DEBUG ((LM_DEBUG," (%t) testing cancellation of a future...\n"));
+
+ // Now test the cancelling a future.
+
+ Scheduler *worker_a = new Scheduler ("worker A");
+ worker_a->open ();
+
+ ACE_Future<double> fresulta = worker_a->work (0.01, n_iterations);
+
+ // save the result by copying the future
+ ACE_Future<double> fresultb = fresulta;
+
+ // now we cancel the first future.. but the
+ // calculation will still go on...
+ fresulta.cancel (10.0);
+
+ if (!fresulta.ready ())
+ ACE_DEBUG ((LM_DEBUG," (%t) ERROR: future A is should be ready!!!\n"));
+
+ double resulta = fresulta;
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) cancelled result %f\n", resulta));
+
+ if (resulta != 10.0)
+ ACE_DEBUG ((LM_DEBUG, " (%t) cancelled result should be 10.0!!\n", resulta));
+
+ resulta = fresultb;
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) true result %f\n", resulta));
+
+ worker_a->end ();
+ // @@ Can we safely delete worker_a here?
+}
+
+static void
+test_timeout (int n_iterations)
+{
+ ACE_DEBUG ((LM_DEBUG," (%t) testing timeout on waiting for the result...\n"));
+ Scheduler *worker_a = new Scheduler ("worker A");
+ worker_a->open ();
+
+ ACE_Future<double> fresulta = worker_a->work (0.01, 2 * n_iterations);
+
+ // Should immediately return... and we should see an error...
+ ACE_Time_Value *delay = new ACE_Time_Value (1);
+
+ double resulta;
+ fresulta.get (resulta, delay);
+
+ if (fresulta.ready ())
+ ACE_DEBUG ((LM_DEBUG," (%t) ERROR: future A is should not be ready!!!\n"));
+ else
+ ACE_DEBUG ((LM_DEBUG," (%t) timed out on future A\n"));
+
+ // now we wait until we are done...
+ fresulta.get (resulta);
+ ACE_DEBUG ((LM_DEBUG, " (%t) result %f\n", resulta));
+
+ worker_a->end ();
+ // @@ Can we safely delete worker_a here?
+}
+
+int
+main (int, char *[])
+{
+ int n_iterations = determine_iterations ();
+
+ test_active_object (n_iterations);
+ test_cancellation (n_iterations);
+ test_timeout (n_iterations);
+
+ ACE_DEBUG ((LM_DEBUG," (%t) that's all folks!\n"));
+
+ ACE_OS::sleep (5);
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
+
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/manual_event.cpp b/examples/Threads/manual_event.cpp
new file mode 100644
index 00000000000..26d477fabc0
--- /dev/null
+++ b/examples/Threads/manual_event.cpp
@@ -0,0 +1,108 @@
+// The test shows the use of an ACE_Manual_Event to create a
+// $Id$
+
+// Pseudo_Barrier. Multiple threads are created which do the
+// following:
+//
+// 1. work
+// 2. synch with other threads
+// 3. more work
+//
+// ACE_Manual_Event is use to synch with other
+// threads. ACE_Manual_Event::signal() is used for broadcasting.
+
+#include "ace/Service_Config.h"
+#include "ace/Synch.h"
+#include "ace/Thread_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+static ACE_Atomic_Op <ACE_Thread_Mutex, u_long> amount_of_work = (u_long) 0;
+
+class Pseudo_Barrier
+ // = TITLE
+ // A barrier class using ACE manual-reset events.
+ //
+ // = DESCRIPTION
+ // This is *not* a real barrier.
+ // Pseudo_Barrier is more like a ``one shot'' barrier.
+ // All waiters after the Nth waiter are allowed to go.
+ // The barrier does not reset after the Nth waiter.
+ // For an example of a real barrier, please see class ACE_Barrier.
+{
+public:
+ Pseudo_Barrier (u_long count);
+
+ int wait (void);
+
+private:
+ ACE_Atomic_Op <ACE_Thread_Mutex, u_long> counter_;
+ ACE_Manual_Event event_;
+};
+
+Pseudo_Barrier::Pseudo_Barrier (u_long count)
+ : counter_ (count)
+{
+}
+
+int
+Pseudo_Barrier::wait (void)
+{
+ if (--this->counter_ == 0)
+ return this->event_.signal ();
+ else
+ return this->event_.wait ();
+}
+
+static void *
+worker (void *arg)
+{
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+ Pseudo_Barrier &barrier = *(Pseudo_Barrier *) arg;
+
+ // work
+ ACE_DEBUG ((LM_DEBUG, "(%t) working (%d secs)\n", ++::amount_of_work));
+ ACE_OS::sleep (::amount_of_work);
+
+ // synch with everybody else
+ ACE_DEBUG ((LM_DEBUG, "(%t) waiting to synch with others \n"));
+ barrier.wait ();
+
+ // more work
+ ACE_DEBUG ((LM_DEBUG, "(%t) more work (%d secs)\n", ++::amount_of_work));
+ ACE_OS::sleep (amount_of_work);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) dying \n"));
+
+ return 0;
+}
+
+int
+main (int argc, char **argv)
+{
+ int n_threads = argc == 2 ? atoi (argv[1]) : 5;
+
+ ACE_Thread_Manager &tm = *ACE_Service_Config::thr_mgr ();
+
+ // synch object shared by all threads
+ Pseudo_Barrier barrier (n_threads);
+
+ // create workers
+ if (tm.spawn_n (n_threads, worker, &barrier) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "thread creates for worker failed"), -1);
+
+ // wait for all workers to exit
+ if (tm.wait () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "thread wait failed"), -1);
+ else
+ ACE_DEBUG ((LM_ERROR, "graceful exit\n"));
+
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/process_mutex.cpp b/examples/Threads/process_mutex.cpp
new file mode 100644
index 00000000000..fb23c8b73be
--- /dev/null
+++ b/examples/Threads/process_mutex.cpp
@@ -0,0 +1,68 @@
+// $Id$
+
+// This program tests ACE_Process_Mutexes. To run it, open 3 or 4
+// windows and run this program in each window...
+
+#include "ace/Synch.h"
+#include "ace/Signal.h"
+
+#if defined (ACE_HAS_THREADS)
+
+static sig_atomic_t done;
+
+extern "C" void
+handler (int)
+{
+ done = 1;
+}
+
+int
+main (int argc, char *argv[])
+{
+ char *name = argc > 1 ? argv[1] : "hello";
+ int iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : 100;
+
+ ACE_Process_Mutex pm (name);
+
+ // Register a signal handler.
+ ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
+
+ for (int i = 0; i < iterations && !done; i++)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquiring\n"));
+ if (pm.acquire () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "acquire failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquired\n"));
+
+ ACE_OS::sleep (3);
+
+ if (pm.release () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n"));
+
+ if (pm.tryacquire () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "tryacquire failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = tryacquire\n"));
+
+ if (pm.release () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n"));
+ }
+
+ if (argc > 2)
+ pm.remove ();
+ return 0;
+}
+#else
+int
+main (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE doesn't support support threads on this platform (yet)\n"),
+ -1);
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/process_semaphore.cpp b/examples/Threads/process_semaphore.cpp
new file mode 100644
index 00000000000..d7933897f8e
--- /dev/null
+++ b/examples/Threads/process_semaphore.cpp
@@ -0,0 +1,56 @@
+// $Id$
+
+// This program tests ACE_Process_Semaphore. To run it, open 3 or 4
+// windows and run this program in each window...
+
+#include "ace/Synch.h"
+#include "ace/Signal.h"
+
+static sig_atomic_t done;
+
+extern "C" void
+handler (int)
+{
+ done = 1;
+}
+
+int
+main (int argc, char *argv[])
+{
+ char *name = argc == 1 ? "hello" : argv[1];
+
+ ACE_Process_Semaphore pm (1, name);
+
+ ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
+
+ for (int i = 0; i < 100 && !done; i++)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquiring\n"));
+ if (pm.acquire () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "acquire failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquired\n"));
+
+ ACE_OS::sleep (3);
+
+ if (pm.release () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n"));
+
+ if (pm.tryacquire () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "tryacquire failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = tryacquire\n"));
+
+ if (pm.release () == -1)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n"));
+ }
+
+ if (argc > 2)
+ pm.remove ();
+ return 0;
+}
+
diff --git a/examples/Threads/reader_writer.cpp b/examples/Threads/reader_writer.cpp
new file mode 100644
index 00000000000..32ef262c67e
--- /dev/null
+++ b/examples/Threads/reader_writer.cpp
@@ -0,0 +1,187 @@
+// This test program verifies the functionality of the ACE_OS
+// $Id$
+
+// implementation of readers/writer locks on Win32 and Posix pthreads.
+
+
+#include "ace/Synch.h"
+#include "ace/Thread.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Get_Opt.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Default number of iterations.
+static int n_iterations = 1000;
+
+// Default number of loops.
+static int n_loops = 100;
+
+// Default number of readers.
+static int n_readers = 6;
+
+// Default number of writers.
+static int n_writers = 2;
+
+// Thread id of last writer.
+volatile static int shared_data;
+
+// Lock for shared_data.
+static ACE_RW_Mutex rw_mutex;
+
+// Count of the number of readers and writers.
+ACE_Atomic_Op<ACE_Thread_Mutex, int> current_readers, current_writers;
+
+// Thread manager
+static ACE_Thread_Manager thr_mgr;
+
+// Explain usage and exit.
+static void
+print_usage_and_die (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "usage: %n [-r n_readers] [-w n_writers] [-n iteration_count]\n"));
+ ACE_OS::exit (1);
+}
+
+// Parse the command-line arguments and set options.
+static void
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "r:w:n:l:");
+
+ int c;
+
+ while ((c = get_opt ()) != -1)
+ switch (c)
+ {
+ case 'r':
+ n_readers = ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'w':
+ n_writers = ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'n':
+ n_iterations = ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'l':
+ n_loops = ACE_OS::atoi (get_opt.optarg);
+ break;
+ default:
+ print_usage_and_die ();
+ break;
+ }
+}
+
+// Iterate <n_iterations> each time checking that nobody modifies the data
+// while we have a read lock.
+
+static void *
+reader (void *)
+{
+ ACE_Thread_Control tc (&thr_mgr);
+ ACE_DEBUG ((LM_DEBUG, "(%t) reader starting\n"));
+
+ for (int iterations = 1; iterations <= n_iterations; iterations++)
+ {
+ ACE_Read_Guard<ACE_RW_Mutex> g(rw_mutex);
+ int n = ++current_readers;
+ //ACE_DEBUG ((LM_DEBUG, "(%t) I'm reader number %d\n", n));
+
+ if (current_writers > 0)
+ ACE_DEBUG ((LM_DEBUG, "(%t) writers found!!!\n"));
+
+ int data = shared_data;
+
+ for (int loop = 1; loop <= n_loops; loop++)
+ {
+ ACE_Thread::yield();
+ if (shared_data != data)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) somebody changed %d to %d\n",
+ data, shared_data));
+ }
+
+ --current_readers;
+ //ACE_DEBUG ((LM_DEBUG, "(%t) done with reading guarded data\n"));
+
+ ACE_Thread::yield ();
+ }
+ return 0;
+}
+
+// Iterate <n_iterations> each time modifying the global data
+// and checking that nobody steps on it while we can write it.
+
+static void *
+writer (void *)
+{
+ ACE_Thread_Control tc (&thr_mgr);
+ ACE_DEBUG ((LM_DEBUG, "(%t) writer starting\n"));
+
+ for (int iterations = 1; iterations <= n_iterations; iterations++)
+ {
+ ACE_Write_Guard<ACE_RW_Mutex> g(rw_mutex);
+
+ ++current_writers;
+ //ACE_DEBUG ((LM_DEBUG, "(%t) writing to guarded data\n"));
+
+ if (current_writers > 1)
+ ACE_DEBUG ((LM_DEBUG, "(%t) other writers found!!!\n"));
+
+ if (current_readers > 0)
+ ACE_DEBUG ((LM_DEBUG, "(%t) readers found!!!\n"));
+
+ int self = (int) ACE_Thread::self ();
+ shared_data = self;
+
+ for (int loop = 1; loop <= n_loops; loop++)
+ {
+ ACE_Thread::yield();
+ if (shared_data != self)
+ ACE_DEBUG ((LM_DEBUG, "(%t) somebody wrote on my data %d\n", shared_data));
+ }
+
+ --current_writers;
+
+ //ACE_DEBUG ((LM_DEBUG, "(%t) done with guarded data\n"));
+ ACE_Thread::yield ();
+ }
+ return 0;
+}
+
+// Spawn off threads.
+
+int main (int argc, char *argv[])
+{
+ ACE_LOG_MSG->open (argv[0]);
+ parse_args (argc, argv);
+
+ current_readers = 0; // Possibly already done
+ current_writers = 0; // Possibly already done
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) main thread starting\n"));
+
+ if (thr_mgr.spawn_n (n_readers, reader, 0, THR_NEW_LWP) == -1 ||
+ thr_mgr.spawn_n (n_writers, writer, 0, THR_NEW_LWP) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1);
+
+ thr_mgr.wait ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) exiting main thread\n"));
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Atomic_Op<ACE_Thread_Mutex, int>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
+
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
+
diff --git a/examples/Threads/recursive_mutex.cpp b/examples/Threads/recursive_mutex.cpp
new file mode 100644
index 00000000000..1cc2892b2a8
--- /dev/null
+++ b/examples/Threads/recursive_mutex.cpp
@@ -0,0 +1,108 @@
+// $Id$
+
+// This test program verifies the functionality of the ACE_OS
+// implementation of recursive mutexes on Win32 and Posix pthreads.
+
+#include "ace/Service_Config.h"
+#include "ace/Get_Opt.h"
+#include "ace/Synch.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Total number of iterations.
+static size_t n_iterations = 1000;
+static size_t n_threads = 4;
+
+// Explain usage and exit.
+static void
+print_usage_and_die (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "usage: %n [-t n_threads] [-n iteration_count]\n"));
+ ACE_OS::exit (1);
+}
+
+// Parse the command-line arguments and set options.
+
+static void
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "n:t:");
+
+ int c;
+
+ while ((c = get_opt ()) != -1)
+ switch (c)
+ {
+ case 'n':
+ n_iterations = ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 't':
+ n_threads = ACE_OS::atoi (get_opt.optarg);
+ break;
+ default:
+ print_usage_and_die ();
+ break;
+ }
+}
+
+static void
+recursive_worker (size_t nesting_level,
+ ACE_Recursive_Thread_Mutex *rm)
+{
+ if (nesting_level < n_iterations)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) = trying to acquire, nesting = %d, thread id = %u\n",
+ rm->get_nesting_level (), rm->get_thread_id ()));
+ {
+ // This illustrates the use of the ACE_Guard<LOCK> with an
+ // ACE_Recursive_Thread_Mutex.
+ ACE_GUARD (ACE_Recursive_Thread_Mutex, ace_mon, *rm);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) = acquired, nesting = %d, thread id = %u\n",
+ rm->get_nesting_level (), rm->get_thread_id ()));
+
+ recursive_worker (nesting_level + 1, rm);
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) = released, nesting = %d, thread id = %u\n",
+ rm->get_nesting_level (), rm->get_thread_id ()));
+ }
+}
+
+static void *
+worker (void *arg)
+{
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+
+ ACE_Recursive_Thread_Mutex *rm = (ACE_Recursive_Thread_Mutex *) arg;
+
+ recursive_worker (0, rm);
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon (argv[0]);
+
+ parse_args (argc, argv);
+ ACE_Recursive_Thread_Mutex rm;
+
+ ACE_Service_Config::thr_mgr ()->spawn_n (n_threads,
+ ACE_THR_FUNC (worker),
+ (void *) &rm);
+
+ ACE_Service_Config::thr_mgr ()->wait ();
+ return 0;
+}
+#else
+int
+main (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE doesn't support support process mutexes on this platform (yet)\n"),
+ -1);
+}
+#endif /* ACE_WIN32 */
diff --git a/examples/Threads/task_four.cpp b/examples/Threads/task_four.cpp
new file mode 100644
index 00000000000..64209cb3430
--- /dev/null
+++ b/examples/Threads/task_four.cpp
@@ -0,0 +1,248 @@
+// $Id$
+
+// The following test was written by Hamutal Yanay & Ari Erev's
+// (Ari_Erev@comverse.com).
+//
+// This test program test enhancements to the thread_manager and task
+// classes. The purpose of these enhancements was to allow the
+// thread_manager to recognize the concept of an ACE_Task and to be
+// able to group ACE_Tasks in groups.
+//
+// There are two main ACE_Tasks in this sample:
+//
+// Invoker_Task - is run from main (). It's purpose is to run a number of
+// ACE_Tasks of type Worker_Task. The number can be specified
+// on the command line.
+// After starting the tasks, the Invoker_Task groups all the tasks
+// in one group and then uses the
+// num_tasks_in_group () to find out if the real number of tasks
+// that are now running (should be the same as the number of tasks
+// started).
+// It also, suspends and resumes all the threads in the group to
+// test the suspend_grp () and resume_grp () methods.
+// Then it waits for all the tasks to end.
+// Worker_Task - ACE_Tasks that are started by the Invoker_Task.
+// Each Worker_Task can start a number of threads.
+// The Worker_Task threads perform some work (iteration). The number
+// of the iterations can be specified on the command line.
+//
+// The command line syntax is:
+//
+// test_task [num_tasks] [num_threads] [num_iterations]
+
+#include "ace/Task.h"
+#include "ace/Service_Config.h"
+
+
+#if defined (ACE_HAS_THREADS)
+
+#include "ace/Task.h"
+
+class Invoker_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Invoker_Task (ACE_Thread_Manager *thr_mgr,
+ int n_tasks,
+ int n_threads,
+ int n_iterations);
+ virtual int svc (void);
+ // creats <n_tasks> and wait for them to finish
+
+private:
+ int n_tasks_;
+ // Number of tasks to start.
+ int n_threads_;
+ // Number of threads per task.
+ int n_iterations_;
+ // Number of iterations per thread.
+
+ // = Not needed for this test.
+ virtual int open (void *) { return 0; }
+ virtual int close (u_long) { return 0; }
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
+};
+
+class Worker_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Worker_Task (ACE_Thread_Manager *thr_mgr,
+ int n_threads,
+ int n_iterations);
+ virtual int svc (void);
+ // Does a small work...
+ virtual int open (void * = NULL);
+private:
+ static int workers_count_;
+ int index_;
+ int n_threads_;
+ int n_iterations_;
+
+ // = Not needed for this test.
+ virtual int close (u_long) { return 0; }
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
+};
+
+int Worker_Task::workers_count_ = 1;
+
+Worker_Task::Worker_Task (ACE_Thread_Manager *thr_mgr,
+ int n_threads,
+ int n_iterations)
+ : n_threads_ (n_threads),
+ n_iterations_ (n_iterations),
+ ACE_Task<ACE_MT_SYNCH> (thr_mgr)
+{
+ index_ = workers_count_++;
+}
+
+int
+Worker_Task::open (void *)
+{
+ // Create worker threads.
+ int rc = this->activate (THR_NEW_LWP, n_threads_, 0, 0, -1, this);
+
+ if (rc == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
+
+ return rc;
+}
+
+int
+Worker_Task::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, " (%t) in worker %d\n", index_));
+
+ for (int iterations = 1;
+ iterations <= this->n_iterations_;
+ iterations++)
+ {
+ ACE_DEBUG ((LM_DEBUG, " (%t) in iteration %d\n", iterations));
+ ACE_OS::sleep (0);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) worker %d ends\n", index_));
+
+ return 0;
+}
+
+Invoker_Task::Invoker_Task (ACE_Thread_Manager *thr_mgr,
+ int n_tasks,
+ int n_threads,
+ int n_iterations)
+ : n_tasks_ (n_tasks),
+ n_threads_ (n_threads),
+ n_iterations_ (n_iterations),
+ ACE_Task<ACE_MT_SYNCH> (thr_mgr)
+{
+ // Create worker threads.
+ if (this->activate (THR_NEW_LWP, 1, 0, 0, -1, this) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
+}
+
+// Iterate <n_iterations> time printing off a message and "waiting"
+// for all other threads to complete this iteration.
+
+int
+Invoker_Task::svc (void)
+{
+ // Note that the ACE_Task::svc_run () method automatically adds us to
+ // the Thread_Manager when the thread begins.
+
+ ACE_Thread_Manager *thr_mgr = ACE_Service_Config::thr_mgr ();
+ Worker_Task **pTask = new Worker_Task* [n_tasks_];
+
+ for (int task = 0;
+ task < this->n_tasks_;
+ task++)
+ {
+ ACE_DEBUG ((LM_DEBUG, " (%t) in task %d\n", task+1));
+ pTask[task] = new Worker_Task (thr_mgr, n_threads_, n_iterations_);
+ pTask[task]->open ();
+ }
+
+ // Set all tasks to be one group
+ ACE_DEBUG ((LM_DEBUG, " (%t) setting tasks group id\n"));
+ for (task = 0;
+ task < this->n_tasks_;
+ task++)
+ if (thr_mgr->set_grp (pTask[task], 1) == -1)
+ ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "set_grp"));
+
+ int nTasks = thr_mgr->num_tasks_in_group (1);
+ cout << "Number of tasks in group 1: " << nTasks << endl;
+
+ // Wait for 1 second and then suspend every thread in the group.
+ ACE_OS::sleep (1);
+ ACE_DEBUG ((LM_DEBUG, " (%t) suspending group\n"));
+ if (thr_mgr->suspend_grp (1) == -1)
+ ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_grp"));
+
+ // Wait for 5 more second and then resume every thread in the
+ // group.
+ ACE_OS::sleep (ACE_Time_Value (5));
+
+ // @QTSK This ACE_DEBUG statement blows us away! can't understand why
+ ACE_DEBUG ((LM_DEBUG, " (%t) resuming group\n"));
+ if (thr_mgr->resume_grp (1) == -1)
+ ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_grp"));
+
+
+ // Wait for all the tasks to reach their exit point.
+ thr_mgr->wait ();
+
+ // Note that the ACE_Task::svc_run () method automatically removes us
+ // from the Thread_Manager when the thread exits.
+
+ return 0;
+}
+
+// Default number of tasks and iterations.
+static const int DEFAULT_TASKS = 4;
+static const int DEFAULT_ITERATIONS = 5;
+
+int
+main (int argc, char *argv[])
+{
+ int n_tasks = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_TASKS;
+ int n_threads = argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_THREADS;
+ int n_iterations = argc > 3 ? ACE_OS::atoi (argv[3]) : DEFAULT_ITERATIONS;
+
+ // Since ACE_Thread_Manager can only wait for all threads, we'll have
+ // special manager for the Invoker_Task.
+ ACE_Thread_Manager invoker_manager;
+
+ Invoker_Task invoker (&invoker_manager,
+ n_tasks,
+ n_threads,
+ n_iterations);
+
+ // Wait for 1 second and then suspend the invoker task
+ ACE_OS::sleep (1);
+ ACE_DEBUG ((LM_DEBUG, " (%t) suspending invoker task\n"));
+
+ if (invoker_manager.suspend_task (&invoker) == -1)
+ ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_task"));
+
+ // Wait for 5 more second and then resume the invoker task.
+ ACE_OS::sleep (ACE_Time_Value (5));
+
+ // @QTSK This ACE_DEBUG statement blows us away! can't understand why
+ ACE_DEBUG ((LM_DEBUG, " (%t) resuming invoker task\n"));
+ if (invoker_manager.resume_task (&invoker) == -1)
+ ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_task"));
+
+
+ // Wait for all the threads to reach their exit point.
+ invoker_manager.wait ();
+
+ // @QTSK This ACE_DEBUG statement blows us away! can't understand why
+ ACE_DEBUG ((LM_DEBUG, " (%t) done\n"));
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/task_one.cpp b/examples/Threads/task_one.cpp
new file mode 100644
index 00000000000..d0a8a12e6c4
--- /dev/null
+++ b/examples/Threads/task_one.cpp
@@ -0,0 +1,104 @@
+// This test program illustrates how the ACE barrier synchronization
+// $Id$
+
+// mechanisms work in conjunction with the ACE_Task and the
+// ACE_Thread_Manager. It is instructive to compare this with the
+// test_barrier.cpp test to see how they differ.
+
+#include "ace/Task.h"
+#include "ace/Service_Config.h"
+
+
+#if defined (ACE_HAS_THREADS)
+
+#include "ace/Task.h"
+
+class Barrier_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Barrier_Task (ACE_Thread_Manager *thr_mgr,
+ int n_threads,
+ int n_iterations);
+
+ virtual int svc (void);
+ // Iterate <n_iterations> time printing off a message and "waiting"
+ // for all other threads to complete this iteration.
+
+private:
+ ACE_Barrier barrier_;
+ // Reference to the tester barrier. This controls each
+ // iteration of the tester function running in every thread.
+
+ int n_iterations_;
+ // Number of iterations to run.
+
+ // = Not needed for this test.
+ virtual int open (void *) { return 0; }
+ virtual int close (u_long) { return 0; }
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
+};
+
+Barrier_Task::Barrier_Task (ACE_Thread_Manager *thr_mgr,
+ int n_threads,
+ int n_iterations)
+ : ACE_Task<ACE_MT_SYNCH> (thr_mgr),
+ barrier_ (n_threads),
+ n_iterations_ (n_iterations)
+{
+ // Create worker threads.
+ if (this->activate (THR_NEW_LWP, n_threads) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
+}
+
+// Iterate <n_iterations> time printing off a message and "waiting"
+// for all other threads to complete this iteration.
+
+int
+Barrier_Task::svc (void)
+{
+ // Note that the ACE_Task::svc_run() method automatically adds us to
+ // the Thread_Manager when the thread begins.
+
+ for (int iterations = 1;
+ iterations <= this->n_iterations_;
+ iterations++)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d\n", iterations));
+
+ // Block until all other threads have waited, then continue.
+ this->barrier_.wait ();
+ }
+
+ // Note that the ACE_Task::svc_run() method automatically removes us
+ // from the Thread_Manager when the thread exits.
+
+ return 0;
+}
+
+// Default number of threads to spawn.
+static const int DEFAULT_ITERATIONS = 5;
+
+int
+main (int argc, char *argv[])
+{
+ int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
+ int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS;
+
+ Barrier_Task barrier_task (ACE_Service_Config::thr_mgr (),
+ n_threads,
+ n_iterations);
+
+ // Wait for all the threads to reach their exit point.
+ ACE_Service_Config::thr_mgr ()->wait ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) done\n"));
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/task_three.cpp b/examples/Threads/task_three.cpp
new file mode 100644
index 00000000000..0214ac10ddf
--- /dev/null
+++ b/examples/Threads/task_three.cpp
@@ -0,0 +1,230 @@
+// $Id$
+
+// Exercise more tests for the ACE Tasks. This also shows off some
+// Interesting uses of the ACE Log_Msg's ability to print to ostreams.
+// BTW, make sure that you set the out_stream in *every* thread that
+// you want to have write to the output file, i.e.:
+//
+//
+// if (out_stream)
+// {
+// ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
+// ACE_LOG_MSG->msg_ostream (out_stream);
+// }
+
+#include <fstream.h>
+#include "ace/Reactor.h"
+#include "ace/Service_Config.h"
+#include "ace/Task.h"
+
+
+#if defined (ACE_HAS_THREADS)
+
+static ofstream *out_stream = 0;
+
+static const int NUM_INVOCATIONS = 100;
+static const int TASK_COUNT = 130;
+
+class Test_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Test_Task (void);
+ ~Test_Task (void);
+
+ virtual int open (void *args = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
+ virtual int svc (void);
+
+ virtual int handle_input (ACE_HANDLE fd);
+
+ ACE_Reactor *r_;
+ int handled_;
+ static int current_count_;
+ static int done_cnt_;
+};
+
+int Test_Task::current_count_ = 0;
+int Test_Task::done_cnt_ = 0;
+
+static ACE_Thread_Mutex lock_;
+
+Test_Task::Test_Task (void)
+{
+ ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_);
+
+ this->handled_ = 0;
+ Test_Task::current_count_++;
+ ACE_DEBUG ((LM_DEBUG,
+ "Test_Task constructed, current_count_ = %d\n",
+ Test_Task::current_count_));
+}
+
+Test_Task::~Test_Task (void)
+{
+ ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_);
+
+ ACE_DEBUG ((LM_DEBUG, "Test_Task destroyed, current_count_ = %d\n",
+ Test_Task::current_count_));
+}
+
+int
+Test_Task::open (void *args)
+{
+ r_ = (ACE_Reactor *) args;
+ return ACE_Task<ACE_MT_SYNCH>::activate (THR_NEW_LWP);
+}
+
+int
+Test_Task::close (u_long)
+{
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock_, -1);
+
+ Test_Task::current_count_--;
+ ACE_DEBUG ((LM_DEBUG, "Test_Task::close () current_count_ = %d.\n",
+ Test_Task::current_count_));
+ return 0;
+}
+
+int
+Test_Task::put (ACE_Message_Block *, ACE_Time_Value *)
+{
+ return 0;
+}
+
+Test_Task::svc (void)
+{
+ // Every thread must register the same stream to write to file.
+ if (out_stream)
+ {
+ ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
+ ACE_LOG_MSG->msg_ostream (out_stream);
+ }
+
+ for (int index = 0; index < NUM_INVOCATIONS; index++)
+ {
+ ACE_OS::thr_yield ();
+
+ if (r_->notify (this, ACE_Event_Handler::READ_MASK))
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock_, -1);
+
+ ACE_DEBUG ((LM_DEBUG, "Test_Task: error notifying reactor!\n"));
+ }
+ }
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) returning from svc ()\n"));
+ return 0;
+}
+
+int
+Test_Task::handle_input (ACE_HANDLE)
+{
+ this->handled_++;
+
+ if (this->handled_ == NUM_INVOCATIONS)
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock_, -1);
+ Test_Task::done_cnt_++;
+ ACE_DEBUG ((LM_DEBUG,
+ " (%t) Test_Task: handle_input! done_cnt_ = %d.\n",
+ Test_Task::done_cnt_));
+ }
+
+ ACE_OS::thr_yield ();
+ return -1;
+}
+
+static void *
+dispatch (void *arg)
+{
+ // every thread must register the same stream to write to file
+ if (out_stream)
+ {
+ ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
+ ACE_LOG_MSG->msg_ostream (out_stream);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, " (%t) Dispatcher Thread started!\n"));
+ ACE_Reactor *r = (ACE_Reactor *) arg;
+ int result;
+
+ r->owner (ACE_OS::thr_self ());
+
+ while (1)
+ {
+ result = r->handle_events ();
+ if (result <= 0)
+ ACE_DEBUG ((LM_DEBUG, "Dispatch: handle_events (): %d", result));
+ }
+
+ return 0;
+}
+
+extern "C" void
+handler (int)
+{
+ *out_stream << flush;
+ out_stream->close ();
+ ACE_OS::exit (42);
+}
+
+int
+main (int argc, char **)
+{
+ if (argc > 1)
+ {
+ // Send output to file.
+ out_stream = new ofstream ("test_task_three.out", ios::trunc|ios::out);
+ ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
+ ACE_LOG_MSG->msg_ostream (out_stream);
+ }
+
+ // Register a signal handler.
+ ACE_Sig_Action sa (ACE_SignalHandler (handler), SIGINT);
+
+ ACE_Reactor *reactor1 = ACE_Service_Config::reactor ();
+ ACE_Reactor *reactor2 = new ACE_Reactor ();
+
+ Test_Task t1[TASK_COUNT];
+ Test_Task t2[TASK_COUNT];
+
+ ACE_Thread::spawn (ACE_THR_FUNC (dispatch), reactor2);
+
+ reactor1->owner (ACE_OS::thr_self ());
+
+ for (int index = 0; index < TASK_COUNT; index++)
+ {
+ t1[index].open (reactor1);
+ t2[index].open (reactor2);
+ }
+
+ ACE_OS::sleep (3);
+
+ for (;;)
+ {
+ ACE_Time_Value timeout (2);
+
+ if (reactor1->handle_events (timeout) <= 0)
+ {
+ if (errno == ETIME)
+ {
+ ACE_DEBUG ((LM_DEBUG, "no activity within 2 seconds, shutting down\n"));
+ break;
+ }
+ else
+ ACE_ERROR ((LM_ERROR, "%p error handling events\n", "main"));
+ }
+ }
+
+ return 0;
+}
+
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/task_two.cpp b/examples/Threads/task_two.cpp
new file mode 100644
index 00000000000..1c6366c4b12
--- /dev/null
+++ b/examples/Threads/task_two.cpp
@@ -0,0 +1,156 @@
+// $Id$
+
+// Exercise more tests for the ACE Tasks. This test can spawn off
+// zillions of tasks and then wait for them using both polling and the
+// ACE Thread Manager.
+
+#include "ace/Task.h"
+
+#include "ace/Service_Config.h"
+#include "ace/Synch.h"
+
+#if defined (ACE_HAS_THREADS)
+
+typedef ACE_Atomic_Op<ACE_Thread_Mutex, int> ATOMIC_INT;
+
+static u_long zero = 0;
+static ATOMIC_INT task_count (zero);
+static ATOMIC_INT max_count (zero);
+static ATOMIC_INT wait_count (zero);
+
+static int n_threads = 0;
+
+// Default number of tasks.
+static const int default_threads = ACE_DEFAULT_THREADS;
+
+// Default number of times to run the test.
+static const int default_iterations = 1000;
+
+class Task_Test : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ virtual int open (void *args = 0);
+ virtual int close (u_long flags = 0);
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
+ virtual int svc (void);
+
+private:
+ static ACE_Thread_Mutex lock_;
+};
+
+ACE_Thread_Mutex Task_Test::lock_;
+
+int
+Task_Test::open (void *)
+{
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Task_Test::lock_, -1);
+
+ task_count++;
+ ACE_DEBUG ((LM_DEBUG, "(%t) creating Task_Test, task count = %d\n",
+ (u_long) task_count));
+
+ return this->activate (THR_BOUND);
+}
+
+int
+Task_Test::close (u_long)
+{
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Task_Test::lock_, -1);
+
+ task_count--;
+ ACE_DEBUG ((LM_DEBUG, "(%t) destroying Task_Test, task count = %d\n",
+ (u_long) task_count));
+ wait_count--;
+// delete this;
+ return 0;
+}
+
+int
+Task_Test::put (ACE_Message_Block *,
+ ACE_Time_Value *)
+{
+ return 0;
+}
+
+int
+Task_Test::svc (void)
+{
+ wait_count++;
+ max_count++;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) svc: waiting\n"));
+
+ for (;;)
+ if (max_count >= n_threads)
+ break;
+ else
+ ACE_Thread::yield ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) svc: finished waiting\n"));
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : default_threads;
+ int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : default_iterations;
+
+ Task_Test **task_array = new Task_Test *[n_threads];
+
+ for (int i = 1; i <= n_iterations; i++)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) iteration = %d, max_count %d\n",
+ i, (u_long) max_count));
+ max_count = 0;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) starting %d task%s\n",
+ n_threads, n_threads == 1 ? "" : "s"));
+
+ // Launch the new tasks.
+ for (int j = 0; j < n_threads; j++)
+ {
+ task_array[j] = new Task_Test;
+ // Activate the task, i.e., make it an active object.
+ task_array[j]->open ();
+ }
+
+ // Wait for initialization to kick in.
+ while (max_count == 0)
+ ACE_Thread::yield ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) waiting for threads to finish\n"));
+
+ // Wait for the threads to finish this iteration.
+ while (max_count != n_threads && wait_count != 0)
+ ACE_Thread::yield ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) iteration %d finished, max_count %d, wait_count %d, waiting for tasks to exit\n",
+ i, (u_long) max_count, (u_long) wait_count));
+
+ // Wait for all the tasks to exit.
+ ACE_Service_Config::thr_mgr ()->wait ();
+
+ // Delete the existing tasks.
+ for (int k = 0; k < n_threads; k++)
+ delete task_array[k];
+ }
+
+ delete [] task_array;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down the test\n"));
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
+
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/thread_manager.cpp b/examples/Threads/thread_manager.cpp
new file mode 100644
index 00000000000..73029d70d88
--- /dev/null
+++ b/examples/Threads/thread_manager.cpp
@@ -0,0 +1,104 @@
+// $Id$
+
+// Test out the group management mechanisms provided by the
+// ACE_Thread_Manager, including the group signal handling, group
+// suspension and resumption, and cooperative thread cancellation
+// mechanisms.
+
+#include "ace/Service_Config.h"
+#include "ace/Thread_Manager.h"
+
+#if defined (ACE_HAS_THREADS)
+
+extern "C" void
+handler (int signum)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) received signal %d\n", signum));
+}
+
+static void *
+worker (int iterations)
+{
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+
+ for (int i = 0; i < iterations; i++)
+ {
+ if ((i % 1000) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) checking cancellation before iteration %d!\n",
+ i));
+
+ if (ACE_Service_Config::thr_mgr ()->testcancel (ACE_Thread::self ()) != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) has been cancelled before iteration %d!\n",
+ i));
+ break;
+ }
+ }
+ }
+
+ // Destructor removes thread from Thread_Manager.
+ return 0;
+}
+
+static const int DEFAULT_THREADS = ACE_DEFAULT_THREADS;
+static const int DEFAULT_ITERATIONS = 100000;
+
+int
+main (int argc, char *argv[])
+{
+ ACE_Service_Config daemon;
+
+ daemon.open (argv[0]);
+
+ // Register a signal handler.
+ ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
+
+ int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_THREADS;
+ int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS;
+
+ ACE_Thread_Manager *thr_mgr = ACE_Service_Config::thr_mgr ();
+
+ int grp_id = thr_mgr->spawn_n (n_threads, ACE_THR_FUNC (worker),
+ (void *) n_iterations,
+ THR_NEW_LWP | THR_DETACHED);
+
+ // Wait for 1 second and then suspend every thread in the group.
+ ACE_OS::sleep (1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) suspending group\n"));
+ if (thr_mgr->suspend_grp (grp_id) == -1)
+ ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "suspend_grp"));
+
+ // Wait for 1 more second and then resume every thread in the
+ // group.
+ ACE_OS::sleep (ACE_Time_Value (1));
+ ACE_DEBUG ((LM_DEBUG, "(%t) resuming group\n"));
+ if (thr_mgr->resume_grp (grp_id) == -1)
+ ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "resume_grp"));
+
+ // Wait for 1 more second and then send a SIGINT to every thread in
+ // the group.
+ ACE_OS::sleep (ACE_Time_Value (1));
+ ACE_DEBUG ((LM_DEBUG, "(%t) signaling group\n"));
+ if (thr_mgr->kill_grp (grp_id, SIGINT) == -1)
+ ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "kill_grp"));
+
+ // Wait for 1 more second and then cancel all the threads.
+ ACE_OS::sleep (ACE_Time_Value (1));
+ ACE_DEBUG ((LM_DEBUG, "(%t) cancelling group\n"));
+ if (thr_mgr->cancel_grp (grp_id) == -1)
+ ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "cancel_grp"));
+
+ // Perform a barrier wait until all the threads have shut down.
+ thr_mgr->wait ();
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "threads not supported on this platform\n"), -1);
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp
new file mode 100644
index 00000000000..9478ed0883d
--- /dev/null
+++ b/examples/Threads/thread_pool.cpp
@@ -0,0 +1,214 @@
+// This test program illustrates how the ACE task synchronization
+// $Id$
+
+// mechanisms work in conjunction with the ACE_Task and the
+// ACE_Thread_Manager. If the manual flag is not set input comes from
+// stdin until the user enters a return only. This stops all workers
+// via a message block of length 0. This is an alternative shutdown of
+// workers compared to queue deactivate.
+//
+// This code is original based on a test program written by Karlheinz
+// Dorn. It was modified to utilize more "ACE" features by Doug Schmidt.
+
+#include "ace/Task.h"
+#include "ace/Service_Config.h"
+
+#include "ace/Task.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Number of iterations to run the test.
+static int n_iterations = 100;
+
+class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads);
+
+ virtual int svc (void);
+ // Iterate <n_iterations> time printing off a message and "waiting"
+ // for all other threads to complete this iteration.
+
+ virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0);
+ // This allows the producer to pass messages to the <Thread_Pool>.
+
+private:
+ virtual int close (u_long);
+
+ // = Not needed for this test.
+ virtual int open (void *) { return 0; }
+};
+
+int
+Thread_Pool::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) close of worker\n"));
+ return 0;
+}
+
+Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr,
+ int n_threads)
+ : ACE_Task<ACE_MT_SYNCH> (thr_mgr)
+{
+ // Create worker threads.
+ if (this->activate (THR_NEW_LWP, n_threads) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
+}
+
+// Simply enqueue the Message_Block into the end of the queue.
+
+int
+Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+{
+ return this->putq (mb, tv);
+}
+
+// Iterate <n_iterations> time printing off a message and "waiting"
+// for all other threads to complete this iteration.
+
+int
+Thread_Pool::svc (void)
+{
+ // Note that the ACE_Task::svc_run () method automatically adds us to
+ // the Thread_Manager when the thread begins.
+
+ int result = 0;
+ int count = 1;
+
+ // Keep looping, reading a message out of the queue, until we get a
+ // message with a length == 0, which signals us to quit.
+
+ for (;; count++)
+ {
+ ACE_Message_Block *mb;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d before getq ()\n", count));
+
+ if (this->getq (mb) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%t) in iteration %d, got result -1, exiting\n", count));
+ break;
+ }
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in iteration %d, length = %d, text = \"%*s\"\n",
+ count, length, length - 1, mb->rd_ptr ()));
+
+ // We're responsible for deallocating this.
+ delete mb;
+
+ if (length == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in iteration %d, got NULL message, exiting\n",
+ count));
+ break;
+ }
+ }
+
+ // Note that the ACE_Task::svc_run () method automatically removes
+ // us from the Thread_Manager when the thread exits.
+ return 0;
+}
+
+static void
+produce (Thread_Pool &thread_pool)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n"));
+ thread_pool.dump ();
+
+ for (int n;;)
+ {
+ // Allocate a new message.
+ ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
+
+#if defined (manual)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) press chars and enter to put a new message into task queue..."));
+ n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ());
+#else // Automatically generate messages.
+ static int count = 0;
+
+ ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count);
+
+ n = ACE_OS::strlen (mb->rd_ptr ());
+
+ if (count == n_iterations)
+ n = 1; // Indicate that we need to shut down.
+ else
+ count++;
+
+ if (count == 0 || (count % 20 == 0))
+ ACE_OS::sleep (1);
+#endif /* manual */
+ if (n > 1)
+ {
+ // Send a normal message to the waiting threads and continue
+ // producing.
+ mb->wr_ptr (n);
+
+ // Pass the message to the Thread_Pool.
+ if (thread_pool.put (mb) == -1)
+ ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
+ }
+ else
+ {
+ // Send a shutdown message to the waiting threads and exit.
+ ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n"));
+ thread_pool.dump ();
+
+ for (int i = thread_pool.thr_count (); i > 0; i--)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) EOF, enqueueing NULL block for thread = %d\n",
+ i));
+
+ // Enqueue a NULL message to flag each consumer to
+ // shutdown.
+ if (thread_pool.put (new ACE_Message_Block) == -1)
+ ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n"));
+ thread_pool.dump ();
+ break;
+ }
+ }
+}
+
+int
+main (int argc, char *argv[])
+{
+ int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
+ n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : n_iterations;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) argc = %d, threads = %d\n",
+ argc, n_threads));
+
+ // Create the worker tasks.
+ Thread_Pool thread_pool (ACE_Service_Config::thr_mgr (),
+ n_threads);
+
+ // Create work for the worker tasks to process in their own threads.
+ produce (thread_pool);
+
+ // Wait for all the threads to reach their exit point.
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n"));
+ ACE_Service_Config::thr_mgr ()->wait ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n"));
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/thread_specific.cpp b/examples/Threads/thread_specific.cpp
new file mode 100644
index 00000000000..f7a4f6dccf3
--- /dev/null
+++ b/examples/Threads/thread_specific.cpp
@@ -0,0 +1,219 @@
+#include "ace/Service_Config.h"
+// $Id$
+
+#include "ace/Synch.h"
+
+#if defined (ACE_HAS_THREADS)
+
+// Define a class that will be stored in thread-specific data. Note
+// that as far as this class is concerned it's just a regular C++
+// class. The ACE_TSS wrapper transparently ensures that
+// objects of this class will be placed in thread-specific storage.
+// All calls on ACE_TSS::operator->() are delegated to the
+// appropriate method in the Errno class.
+
+class Errno
+{
+public:
+ int error (void) { return this->errno_; }
+ void error (int i) { this->errno_ = i; }
+
+ int line (void) { return this->lineno_; }
+ void line (int l) { this->lineno_ = l; }
+
+ // Errno::flags_ is a static variable, so we've got to protect it
+ // with a mutex since it isn't kept in thread-specific storage.
+ int flags (void)
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Errno::lock_, -1);
+
+ return Errno::flags_;
+ }
+
+ void flags (int f)
+ {
+ ACE_GUARD (ACE_Thread_Mutex, ace_mon, Errno::lock_);
+
+ Errno::flags_ = f;
+ }
+
+private:
+ // = errno_ and lineno_ will be thread-specific data so they don't
+ // need a lock.
+ int errno_;
+ int lineno_;
+
+ static int flags_;
+#if defined (ACE_HAS_THREADS)
+ // flags_ needs a lock.
+ static ACE_Thread_Mutex lock_;
+#endif /* ACE_HAS_THREADS */
+};
+
+// Static variables.
+ACE_MT (ACE_Thread_Mutex Errno::lock_);
+int Errno::flags_;
+
+// This is our thread-specific error handler...
+static ACE_TSS<Errno> TSS_Error;
+
+#if defined (ACE_HAS_THREADS)
+// Serializes output via cout.
+static ACE_Thread_Mutex lock;
+
+typedef ACE_TSS_Guard<ACE_Thread_Mutex> GUARD;
+#else
+// Serializes output via cout.
+static ACE_Null_Mutex lock;
+
+typedef ACE_Guard<ACE_Null_Mutex> GUARD;
+#endif /* ACE_HAS_THREADS */
+
+static void
+cleanup (void *ptr)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) in cleanup, ptr = %x\n", ptr));
+
+ delete ptr;
+}
+
+// This worker function is the entry point for each thread.
+
+static void *
+worker (void *c)
+{
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+ int count = int (c);
+
+ ACE_thread_key_t key = 0;
+ int *ip = 0;
+
+ // Make one key that will be available when the thread exits so that
+ // we'll have something to cleanup!
+
+ if (ACE_OS::thr_keycreate (&key, cleanup) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keycreate"));
+
+ ip = new int;
+
+ if (ACE_OS::thr_setspecific (key, (void *) ip) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific"));
+
+ for (int i = 0; i < count; i++)
+ {
+ if (ACE_OS::thr_keycreate (&key, cleanup) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keycreate"));
+
+ ip = new int;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) in worker 1, key = %d, ip = %x\n", key, ip));
+
+ if (ACE_OS::thr_setspecific (key, (void *) ip) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific"));
+
+ if (ACE_OS::thr_getspecific (key, (void **) &ip) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific"));
+
+ if (ACE_OS::thr_setspecific (key, (void *) 0) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific"));
+
+ delete ip;
+
+ if (ACE_OS::thr_keyfree (key) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keyfree"));
+
+ // Cause an error.
+ ACE_OS::read (ACE_INVALID_HANDLE, 0, 0);
+
+ // The following two lines set the thread-specific state.
+ TSS_Error->error (errno);
+ TSS_Error->line (__LINE__);
+
+ // This sets the static state (note how C++ makes it easy to do
+ // both).
+ TSS_Error->flags (count);
+
+ {
+ // Use the guard to serialize access to cout...
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock, 0);
+
+ cout << "(" << ACE_Thread::self ()
+ << ") errno = " << TSS_Error->error ()
+ << ", lineno = " << TSS_Error->line ()
+ << ", flags = " << TSS_Error->flags ()
+ << endl;
+ }
+ key = 0;
+
+ if (ACE_OS::thr_keycreate (&key, cleanup) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keycreate"));
+
+ ip = new int;
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) in worker 2, key = %d, ip = %x\n", key, ip));
+
+ if (ACE_OS::thr_setspecific (key, (void *) ip) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific"));
+
+ if (ACE_OS::thr_getspecific (key, (void **) &ip) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific"));
+
+ if (ACE_OS::thr_setspecific (key, (void *) 0) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific"));
+
+ delete ip;
+
+ if (ACE_OS::thr_keyfree (key) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keyfree"));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) exiting\n"));
+ return 0;
+}
+
+extern "C" void
+handler (int signum)
+{
+ ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum));
+ ACE_Service_Config::thr_mgr ()->exit (0);
+}
+
+int
+main (int argc, char *argv[])
+{
+ // The Service_Config must be the first object defined in main...
+ ACE_Service_Config daemon (argv[0]);
+ ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ());
+ int threads = argc > 1 ? ACE_OS::atoi (argv[1]) : 4;
+ int count = argc > 2 ? ACE_OS::atoi (argv[2]) : 10000;
+
+ // Register a signal handler.
+ ACE_Sig_Action sa ((ACE_SignalHandler) (handler), SIGINT);
+
+#if defined (ACE_HAS_THREADS)
+ if (ACE_Service_Config::thr_mgr ()->spawn_n (threads,
+ ACE_THR_FUNC (&worker),
+ (void *) count,
+ THR_BOUND | THR_DETACHED) == -1)
+ ACE_OS::perror ("ACE_Thread_Manager::spawn_n");
+
+ ACE_Service_Config::thr_mgr ()->wait ();
+#else
+ worker ((void *) count);
+#endif /* ACE_HAS_THREADS */
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_TSS<Errno>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
+
+#else
+int
+main (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE doesn't support support threads on this platform (yet)\n"),
+ -1);
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/token.cpp b/examples/Threads/token.cpp
new file mode 100644
index 00000000000..5a51496d011
--- /dev/null
+++ b/examples/Threads/token.cpp
@@ -0,0 +1,76 @@
+// Test out the ACE Token class.
+// $Id$
+
+#include "ace/Token.h"
+#include "ace/Task.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class My_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ My_Task (int n);
+ virtual int open (void *) { return 0; }
+ virtual int close (u_long) { return 0; }
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
+ virtual int svc (void);
+
+ static void sleep_hook (void *);
+
+private:
+ ACE_Token token_;
+};
+
+My_Task::My_Task (int n)
+{
+ // Make this Task into an Active Object.
+ this->activate (THR_BOUND | THR_DETACHED, n);
+
+ // Wait for all the threads to exit.
+ this->thr_mgr ()->wait ();
+}
+
+void
+My_Task::sleep_hook (void *)
+{
+ cerr << '(' << ACE_Thread::self () << ')'
+ << " blocking, My_Task::sleep_hook () called" << endl;
+}
+
+// Test out the behavior of the ACE_Token class.
+
+int
+My_Task::svc (void)
+{
+ for (int i = 0; i < 10000; i++)
+ {
+ // Wait for up to 1 millisecond past the current time to get the token.
+ ACE_Time_Value timeout (ACE_OS::time (0), 1000);
+
+ if (this->token_.acquire (&My_Task::sleep_hook, 0, &timeout) == 1)
+ {
+ this->token_.acquire ();
+ this->token_.renew ();
+ this->token_.release ();
+ this->token_.release ();
+ }
+ else
+ ACE_Thread::yield ();
+ }
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ My_Task tasks (argc > 1 ? atoi (argv[1]) : 4);
+
+ return 0;
+}
+#else
+int
+main (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR, "your platform doesn't support threads\n"), -1);
+}
+#endif /* */
diff --git a/examples/Threads/tss1.cpp b/examples/Threads/tss1.cpp
new file mode 100644
index 00000000000..7efdc9dc3ef
--- /dev/null
+++ b/examples/Threads/tss1.cpp
@@ -0,0 +1,164 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// TSS_Test.cpp
+//
+// = DESCRIPTION
+// This program tests thread specific storage of data. The ACE_TSS
+// wrapper transparently ensures that the objects of this class
+// will be placed in thread-specific storage. All calls on
+// ACE_TSS::operator->() are delegated to the appropriate method
+// in the Errno class.
+//
+// = AUTHOR
+// Detlef Becker
+//
+// ============================================================================
+
+#include "ace/Service_Config.h"
+#include "ace/Synch.h"
+#include "ace/Task.h"
+
+#if defined (ACE_HAS_THREADS)
+
+static int iterations = 100;
+
+class Errno
+{
+public:
+ int error (void) { return this->errno_; }
+ void error (int i) { this->errno_ = i; }
+
+ int line (void) { return this->lineno_; }
+ void line (int l) { this->lineno_ = l; }
+
+ // Errno::flags_ is a static variable, so we've got to protect it
+ // with a mutex since it isn't kept in thread-specific storage.
+ int flags (void) {
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_Mon, Errno::lock_, -1));
+
+ return Errno::flags_;
+ }
+ int flags (int f)
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Errno::lock_, -1));
+
+ Errno::flags_ = f;
+ return 0;
+ }
+
+private:
+ // = errno_ and lineno_ will be thread-specific data so they don't
+ // need a lock.
+ int errno_;
+ int lineno_;
+
+ static int flags_;
+#if defined (ACE_HAS_THREADS)
+ // flags_ needs a lock.
+ static ACE_Thread_Mutex lock_;
+#endif /* ACE_HAS_THREADS */
+};
+
+// Static variables.
+ACE_MT (ACE_Thread_Mutex Errno::lock_);
+int Errno::flags_;
+
+// This is our thread-specific error handler...
+static ACE_TSS<Errno> TSS_Error;
+
+#if defined (ACE_HAS_THREADS)
+// Serializes output via cout.
+static ACE_Thread_Mutex lock;
+
+typedef ACE_TSS_Guard<ACE_Thread_Mutex> GUARD;
+#else
+// Serializes output via cout.
+static ACE_Null_Mutex lock;
+
+typedef ACE_Guard<ACE_Null_Mutex> GUARD;
+#endif /* ACE_HAS_THREADS */
+
+// Keeps track of whether Tester::close () has started.
+static int close_started = 0;
+
+template <ACE_SYNCH_1>
+class Tester: public ACE_Task<ACE_SYNCH_2>
+{
+public:
+ Tester (void) {}
+ ~Tester (void) {}
+
+ virtual int open (void *theArgs = 0);
+ virtual int close (u_long theArg = 0);
+ virtual int put (ACE_Message_Block *theMsgBlock,
+ ACE_Time_Value *theTimeVal = 0);
+ virtual int svc (void);
+};
+
+template <ACE_SYNCH_1> int
+Tester<ACE_SYNCH_2>::open (void *)
+{
+ return this->activate ();
+}
+
+template <ACE_SYNCH_1>
+int Tester<ACE_SYNCH_2>::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, "close running\n!"));
+ close_started = 1;
+ ACE_OS::sleep (2);
+ ACE_DEBUG ((LM_DEBUG, "close: trying to log error code 7!\n"));
+ TSS_Error->error (7);
+ ACE_DEBUG ((LM_DEBUG, "close: logging succeeded!\n"));
+ return 0;
+}
+
+template <ACE_SYNCH_1> int
+Tester<ACE_SYNCH_2>::put (ACE_Message_Block *, ACE_Time_Value *)
+{
+ return 0;
+}
+
+template <ACE_SYNCH_1> int
+Tester<ACE_SYNCH_2>::svc (void)
+{
+ return 0;
+}
+
+int
+main (int, char *[])
+{
+ Tester<ACE_MT_SYNCH> tester;
+
+ tester.open ();
+
+ while (!close_started)
+ continue;
+
+ ACE_DEBUG ((LM_DEBUG, "main: trying to log error code 7!\n"));
+
+ TSS_Error->error (3);
+
+ ACE_DEBUG ((LM_DEBUG, "main: logging succeeded!\n"));
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_TSS<Errno>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
+
+#else
+int
+main (void)
+{
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE doesn't support support threads on this platform (yet)\n"),
+ -1);
+}
+#endif /* ACE_HAS_THREADS */
diff --git a/examples/Threads/tss2.cpp b/examples/Threads/tss2.cpp
new file mode 100644
index 00000000000..24a8d958e91
--- /dev/null
+++ b/examples/Threads/tss2.cpp
@@ -0,0 +1,252 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// TSS_Test.cpp
+//
+// = DESCRIPTION
+// This program tests thread specific storage of data. The ACE_TSS
+// wrapper transparently ensures that the objects of this class
+// will be placed in thread-specific storage. All calls on
+// ACE_TSS::operator->() are delegated to the appropriate method
+// in the Errno class.
+//
+// = AUTHOR
+// Prashant Jain and Doug Schmidt
+//
+// ============================================================================
+
+#include "ace/Task.h"
+#include "ace/Token.h"
+
+#if defined (ACE_HAS_THREADS)
+
+class TSS_Obj
+{
+public:
+
+ TSS_Obj (void);
+ ~TSS_Obj (void);
+
+private:
+ static int count_;
+ static ACE_Thread_Mutex lock_;
+};
+
+int TSS_Obj::count_ = 0;
+ACE_Thread_Mutex TSS_Obj::lock_;
+
+TSS_Obj::TSS_Obj (void)
+{
+ ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_);
+
+ count_++;
+ cout << "TO+ : " << count_ << endl;
+}
+
+TSS_Obj::~TSS_Obj (void)
+{
+ ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_);
+
+ count_--;
+ cout << "TO- : " << count_ << endl;
+}
+
+class Test_Task
+{
+public:
+
+ Test_Task (void);
+ ~Test_Task (void);
+
+ int open (void *arg);
+
+ static void *svc (void *arg);
+ static int wait_count_;
+ static int max_count_;
+
+private:
+ static int count_;
+};
+
+int Test_Task::count_ = 0;
+int Test_Task::wait_count_ = 0;
+int Test_Task::max_count_ = 0;
+int num_threads_ = 0;
+
+ACE_Token token;
+
+Test_Task::Test_Task (void)
+{
+ ACE_GUARD (ACE_Token, ace_mon, token);
+
+ count_++;
+ cout << "Test_Task+ : "
+ << count_ << " ("
+ << ACE_OS::thr_self ()
+ << ")" << endl;
+}
+
+Test_Task::~Test_Task (void)
+{
+ ACE_GUARD (ACE_Token, ace_mon, token);
+
+ count_--;
+ cout << "Test_Task- : "
+ << count_ << " ("
+ << ACE_OS::thr_self ()
+ << ")" << endl;
+
+ wait_count_--;
+}
+
+void *
+Test_Task::svc (void *arg)
+{
+ ACE_TSS<TSS_Obj> tss (new TSS_Obj);
+
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, 0);
+
+ wait_count_++;
+ max_count_++;
+ cout << "svc: waiting (" << ACE_OS::thr_self () << ")" << endl;
+ }
+
+ while (1)
+ {
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, 0);
+
+ if (max_count_ >= num_threads_)
+ break;
+ else
+ {
+ ace_mon.release ();
+ ACE_Thread::yield ();
+ ace_mon.acquire ();
+ }
+ }
+
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, 0);
+
+ cout << "svc: waiting (" << ACE_OS::thr_self () << ") finished" << endl;
+ }
+ }
+
+ delete (Test_Task *) arg;
+
+ return 0;
+}
+
+int
+Test_Task::open (void *arg)
+{
+ if (ACE_Thread::spawn (Test_Task::svc, arg) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Thread::spawn"), 0);
+
+ return 0;
+}
+
+int
+main (int argc, char **argv)
+{
+ if (argc != 2)
+ {
+ cout << "Missing parameters!" << endl;
+ return 1;
+ }
+
+ int num_Tasks = atoi (argv[1]);
+
+ num_threads_ = num_Tasks;
+
+ Test_Task **task_arr = (Test_Task**) new char[sizeof (Test_Task*) * num_Tasks];
+
+ while (1)
+ {
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1);
+
+ cout << "ReseTest_Tasking Test_Task::max_count_ from: "
+ << Test_Task::max_count_ << endl;
+
+ Test_Task::max_count_ = 0;
+ }
+
+ for (int i = 0; i < num_Tasks; i++)
+ {
+ task_arr[i] = new Test_Task;
+ task_arr[i]->open (task_arr[i]);
+ }
+
+ cout << "Waiting for first thread started..." << endl;
+
+ for (;;)
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1);
+
+ if (Test_Task::max_count_ != 0 )
+ {
+ ace_mon.release ();
+ ACE_Thread::yield ();
+ ace_mon.acquire ();
+ break;
+ }
+ ace_mon.release ();
+ ACE_Thread::yield ();
+ ace_mon.acquire ();
+ }
+
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1);
+
+ cout << "First thread started!" << endl
+ << "Waiting for all threads finished..." << endl;
+ }
+
+ for (;;)
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1);
+
+ if (!(Test_Task::max_count_ == num_threads_
+ && Test_Task::wait_count_ == 0))
+ {
+ ace_mon.release ();
+ ACE_Thread::yield ();
+ ace_mon.acquire ();
+ continue;
+ }
+
+ cout << "Test_Task::max_count_ = "
+ << Test_Task::max_count_
+ << " Test_Task::wait_count_ = "
+ << Test_Task::wait_count_
+ << endl;
+ break;
+ }
+
+ {
+ ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1);
+ cout << "All threads finished..." << endl;
+ }
+
+ ACE_OS::sleep (2);
+ }
+
+ return 0;
+}
+
+#else
+int
+main (int, char *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */