summaryrefslogtreecommitdiff
path: root/examples/Reactor/WFMO_Reactor/test_multithreading.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/Reactor/WFMO_Reactor/test_multithreading.cpp')
-rw-r--r--examples/Reactor/WFMO_Reactor/test_multithreading.cpp245
1 files changed, 245 insertions, 0 deletions
diff --git a/examples/Reactor/WFMO_Reactor/test_multithreading.cpp b/examples/Reactor/WFMO_Reactor/test_multithreading.cpp
new file mode 100644
index 00000000000..ee368d18c11
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_multithreading.cpp
@@ -0,0 +1,245 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_multithreading.cpp
+//
+// = DESCRIPTION
+//
+// This application tests multiple threads simultaneously calling
+// Reactor::handle_events(). It also shows how different threads
+// can update the state of Reactor by registering and removing
+// Event_Handlers.
+//
+// Note that this test will only work with WFMO_Reactor
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Task.h"
+#include "ace/Reactor.h"
+#include "ace/WFMO_Reactor.h"
+#include "ace/Get_Opt.h"
+
+ACE_RCSID(WFMO_Reactor, test_multithreading, "$Id$")
+
+static int concurrent_threads = 1;
+static int number_of_handles = ACE_Reactor::instance ()->size ();
+static int number_of_handles_to_signal = 1;
+static int interval = 2;
+static int iterations = 10;
+
+// Explain usage and exit.
+static void
+print_usage_and_die (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "usage: \n\t"
+ "[-t (# of threads - default 1)] \n\t"
+ "[-h (# of handlers) - default 62] \n\t"
+ "[-i (# time interval between signals) - default 2] \n\t"
+ "[-s (# of handles to signal) - default 1] \n\t"
+ "[-e (# of iterations) - default 10] \n\t"));
+ ACE_OS::exit (1);
+}
+
+// Parse the command-line arguments and set options.
+static void
+parse_args (int argc, char **argv)
+{
+ ACE_Get_Opt get_opt (argc, argv, "t:h:s:i:e:");
+ int c;
+
+ while ((c = get_opt ()) != -1)
+ switch (c)
+ {
+ case 't':
+ concurrent_threads = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'e':
+ iterations = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'h':
+ number_of_handles = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'i':
+ interval = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 's':
+ number_of_handles_to_signal = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ default:
+ print_usage_and_die ();
+ break;
+ }
+}
+
+class Task_Handler : public ACE_Task<ACE_NULL_SYNCH>
+{
+public:
+ Task_Handler (size_t number_of_handles,
+ size_t concurrent_threads);
+ // Constructor.
+
+ ~Task_Handler (void);
+ // Destructor.
+
+ virtual int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+ // Called when object is removed from the ACE_Reactor
+
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ // Handle events being signaled by the main thread.
+
+ virtual int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg = 0);
+ // Called when timer expires.
+
+ int svc (void);
+ // Task event loop.
+
+ int signal (size_t index);
+ // Signal an event.
+
+private:
+ ACE_Auto_Event *events_;
+};
+
+// All threads do reactor->handle_events ()
+int
+Task_Handler::svc (void)
+{
+ // Try to become the owner
+ ACE_Reactor::instance ()->owner (ACE_Thread::self ());
+ // Run the event loop.
+ return ACE_Reactor::run_event_loop ();
+}
+
+Task_Handler::Task_Handler (size_t number_of_handles,
+ size_t concurrent_threads)
+{
+ ACE_NEW (this->events_, ACE_Auto_Event [number_of_handles]);
+
+ for (size_t i = 1; i <= number_of_handles; i++)
+ if (ACE_Reactor::instance ()->register_handler (this,
+ this->events_[i].handle ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\t cannot register handle %d with Reactor\n",
+ "Task_Handler::Task_Handler",
+ i));
+
+ // Make us an active object.
+ if (this->activate (THR_NEW_LWP,
+ concurrent_threads) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\t cannot activate task\n",
+ "activate"));
+}
+
+Task_Handler::~Task_Handler (void)
+{
+ delete [] this->events_;
+}
+
+
+int
+Task_Handler::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *)
+{
+ // When signaled, print message, remove self, and add self
+ // This will force Reactor to update its internal handle tables
+
+ if (ACE_Reactor::instance ()->remove_handler (siginfo->si_handle_,
+ ACE_Event_Handler::DONT_CALL) == -1)
+ return -1;
+ // ACE_ERROR_RETURN ((LM_ERROR,
+ // "(%t) %p\tTask cannot be unregistered from Reactor: handle value = %d\n",
+ // "Task_Handler::handle_signal",
+ // siginfo->si_handle_), -1);
+
+ if (ACE_Reactor::instance ()->register_handler (this,
+ siginfo->si_handle_) == -1)
+ return -1;
+ // ACE_ERROR_RETURN ((LM_ERROR,
+ // "(%t) %p\tTask cannot be registered with Reactor: handle value = %d\n",
+ // "Task_Handler::handle_signal",
+ // siginfo->si_handle_), -1);
+ return 0;
+}
+
+int
+Task_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) handle_close() called: handle value = %d\n",
+ handle));
+ return 0;
+}
+
+int
+Task_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) handle_timeout() called: iteration value = %d\n",
+ int (arg)));
+ return 0;
+}
+
+int
+Task_Handler::signal (size_t index)
+{
+ return this->events_[index].signal ();
+}
+
+int
+main (int argc, char **argv)
+{
+ parse_args (argc, argv);
+ Task_Handler task (number_of_handles,
+ concurrent_threads);
+
+ ACE_OS::srand (ACE_OS::time (0L));
+
+ for (int i = 1; i <= iterations; i++)
+ {
+ // Sleep for a while
+ ACE_OS::sleep (interval);
+
+ // Randomly generate events
+ ACE_DEBUG ((LM_DEBUG, "********************************************************\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t -- main thread) signaling %d events : iteration = %d\n",
+ number_of_handles_to_signal,
+ i));
+ ACE_DEBUG ((LM_DEBUG, "********************************************************\n"));
+
+ // Setup a timer for the task
+ if (ACE_Reactor::instance ()->schedule_timer (&task,
+ (void *) i,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ for (int i = 0; i < number_of_handles_to_signal; i++)
+ // Randomly select a handle to signal.
+ task.signal (ACE_OS::rand() % number_of_handles);
+ }
+
+ // Sleep for a while
+ ACE_OS::sleep (interval);
+
+ // End the Reactor event loop
+ ACE_Reactor::end_event_loop ();
+
+ // Wait for all threads to exit
+ ACE_Thread_Manager::instance ()->wait ();
+
+ // Close the Reactor singleton before exiting this function.
+ // If we wait for the Object Manager to do this, it will be too
+ // late since Task_Handler instance would have disappeared.
+ ACE_Reactor::close_singleton ();
+
+ return 0;
+}