summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/Reactor/WFMO_Reactor/Abondoned.dsp58
-rw-r--r--examples/Reactor/WFMO_Reactor/test_abandoned.cpp121
-rw-r--r--examples/Reactor/WFMO_Reactor/test_apc.cpp105
-rw-r--r--examples/Reactor/WFMO_Reactor/test_console_input.cpp84
-rw-r--r--examples/Reactor/WFMO_Reactor/test_directory_changes.cpp115
-rw-r--r--examples/Reactor/WFMO_Reactor/test_exceptions.cpp97
-rw-r--r--examples/Reactor/WFMO_Reactor/test_handle_close.cpp306
-rw-r--r--examples/Reactor/WFMO_Reactor/test_multithreading.cpp245
-rw-r--r--examples/Reactor/WFMO_Reactor/test_network_events.cpp203
-rw-r--r--examples/Reactor/WFMO_Reactor/test_prerun_state_changes.cpp64
-rw-r--r--examples/Reactor/WFMO_Reactor/test_registration.cpp155
-rw-r--r--examples/Reactor/WFMO_Reactor/test_registry_changes.cpp133
-rw-r--r--examples/Reactor/WFMO_Reactor/test_removals.cpp103
-rw-r--r--examples/Reactor/WFMO_Reactor/test_suspended_removals.cpp163
-rw-r--r--examples/Reactor/WFMO_Reactor/test_talker.cpp559
-rw-r--r--examples/Reactor/WFMO_Reactor/test_timeouts.cpp80
-rw-r--r--examples/Reactor/WFMO_Reactor/test_window_messages.cpp90
17 files changed, 2681 insertions, 0 deletions
diff --git a/examples/Reactor/WFMO_Reactor/Abondoned.dsp b/examples/Reactor/WFMO_Reactor/Abondoned.dsp
new file mode 100644
index 00000000000..d113db90ba6
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/Abondoned.dsp
@@ -0,0 +1,58 @@
+# Microsoft Developer Studio Project File - Name="Abondoned" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=Abondoned - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "Abondoned.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "Abondoned.mak" CFG="Abondoned - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "Abondoned - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Abondone"
+# PROP BASE Intermediate_Dir "Abondone"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\..\..\\" /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 aced.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept /libpath:"..\..\..\ace"
+# Begin Target
+
+# Name "Abondoned - Win32 Debug"
+# Begin Source File
+
+SOURCE=.\test_abandoned.cpp
+# End Source File
+# End Target
+# End Project
diff --git a/examples/Reactor/WFMO_Reactor/test_abandoned.cpp b/examples/Reactor/WFMO_Reactor/test_abandoned.cpp
new file mode 100644
index 00000000000..b3c6c14b86c
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_abandoned.cpp
@@ -0,0 +1,121 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_abandoned.cpp
+//
+// = DESCRIPTION
+//
+// Tests the WFMO_Reactor's ability to handle abandoned mutexes.
+//
+// = AUTHOR
+//
+// Irfan Pyarali <irfan@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Process_Mutex.h"
+
+ACE_RCSID(WFMO_Reactor, test_abandoned, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ int handle_signal (int signum,
+ siginfo_t * = 0,
+ ucontext_t * = 0);
+
+ int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg = 0);
+
+ ACE_Auto_Event handle_;
+ ACE_Process_Mutex *mutex_;
+ int iterations_;
+};
+
+static int abandon = 1;
+
+static void *
+worker (void *data)
+{
+ Event_Handler *handler = (Event_Handler *) data;
+
+ handler->handle_.signal ();
+ handler->mutex_->acquire ();
+
+ if (!abandon)
+ handler->mutex_->release ();
+
+ return 0;
+}
+
+int
+Event_Handler::handle_signal (int signum,
+ siginfo_t *s,
+ ucontext_t *)
+{
+ HANDLE handle = s->si_handle_;
+ if (handle == this->handle_.handle ())
+ ACE_Reactor::instance ()->register_handler (this,
+ this->mutex_->lock ().proc_mutex_);
+ else
+ {
+ ACE_Reactor::instance ()->remove_handler (this->mutex_->lock ().proc_mutex_,
+ ACE_Event_Handler::DONT_CALL);
+ delete this->mutex_;
+ }
+
+ return 0;
+}
+
+int
+Event_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ --this->iterations_;
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) timeout occured @ %T, iterations left %d\n",
+ this->iterations_));
+
+ if (this->iterations_ == 0)
+ ACE_Reactor::end_event_loop ();
+ else
+ {
+ ACE_NEW_RETURN (this->mutex_,
+ ACE_Process_Mutex,
+ -1);
+ int result = ACE_Thread_Manager::instance ()->spawn (&worker, this);
+ ACE_ASSERT (result != -1);
+ }
+
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ Event_Handler event_handler;
+
+ event_handler.iterations_ = 5;
+ int result = ACE_Reactor::instance ()->register_handler
+ (&event_handler,
+ event_handler.handle_.handle ());
+ ACE_ASSERT (result == 0);
+
+ ACE_Time_Value timeout (2);
+ result = ACE_Reactor::instance ()->schedule_timer (&event_handler,
+ 0,
+ timeout,
+ timeout);
+ ACE_ASSERT (result != -1);
+
+ ACE_Reactor::run_event_loop ();
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_apc.cpp b/examples/Reactor/WFMO_Reactor/test_apc.cpp
new file mode 100644
index 00000000000..5a2d79a4de5
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_apc.cpp
@@ -0,0 +1,105 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_apc.cpp
+//
+// = DESCRIPTION
+//
+// Tests the WFMO_Reactor's ability to handle regular APC
+// notifications.
+//
+// = AUTHOR
+//
+// Irfan Pyarali <irfan@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+
+ACE_RCSID(WFMO_Reactor, test_apc, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ int handle_signal (int signum,
+ siginfo_t * = 0,
+ ucontext_t * = 0);
+
+ int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg = 0);
+
+ ACE_Auto_Event handle_;
+ int iterations_;
+};
+
+static Event_Handler *global_event_handler;
+
+static void WINAPI
+apc_callback (DWORD)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) apc occured @ %T\n"));
+
+ global_event_handler->handle_.signal ();
+}
+
+void
+queue_apc (void)
+{
+ DWORD result = ::QueueUserAPC (&apc_callback, // pointer to APC function
+ ::GetCurrentThread (), // handle to the thread
+ 0); // argument for the APC function
+ if (result == FALSE)
+ ACE_OS::exit (-1);
+}
+
+int
+Event_Handler::handle_signal (int signum,
+ siginfo_t *,
+ ucontext_t *)
+{
+ --this->iterations_;
+
+ if (this->iterations_ == 0)
+ ACE_Reactor::end_event_loop ();
+
+ return 0;
+}
+
+int
+Event_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) timeout occured @ %T\n"));
+ queue_apc ();
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ Event_Handler event_handler;
+ event_handler.iterations_ = 5;
+ global_event_handler = &event_handler;
+
+ int result = ACE_Reactor::instance ()->register_handler (&event_handler,
+ event_handler.handle_.handle ());
+ ACE_ASSERT (result == 0);
+
+ ACE_Time_Value timeout (2);
+ result = ACE_Reactor::instance ()->schedule_timer (&event_handler,
+ 0,
+ timeout,
+ timeout);
+ ACE_ASSERT (result != -1);
+
+ ACE_Reactor::run_alertable_event_loop ();
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_console_input.cpp b/examples/Reactor/WFMO_Reactor/test_console_input.cpp
new file mode 100644
index 00000000000..adc9915b0a2
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_console_input.cpp
@@ -0,0 +1,84 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_console_input.cpp
+//
+// = DESCRIPTION
+//
+// This application tests the working of WFMO_Reactor when users
+// are interested in console input.
+//
+// = AUTHOR
+// Irfan Pyarali <irfan@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+
+ACE_RCSID(WFMO_Reactor, test_console_input, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ Event_Handler (ACE_Reactor &reactor);
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+};
+
+Event_Handler::Event_Handler (ACE_Reactor &reactor)
+{
+ this->reactor (&reactor);
+
+ if (this->reactor ()->register_handler (this,
+ ACE_STDIN) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Registration with Reactor could not be done\n"));
+}
+
+int
+Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
+{
+ ACE_TCHAR buffer[BUFSIZ];
+ int result = ACE_OS::read (ACE_STDIN, buffer, sizeof buffer);
+ buffer[result] = '\0';
+
+ if (result <= 0)
+ {
+ this->reactor ()->close ();
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::read"), -1);
+ }
+
+ if (ACE_OS::strcmp (ACE_TEXT("quit\r\n"), buffer) == 0)
+ this->reactor ()->close ();
+
+ ACE_DEBUG ((LM_DEBUG, "User input: %s", buffer));
+
+ return 0;
+}
+
+int
+Event_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "Event_Handler removed from Reactor\n"));
+ return 0;
+}
+
+int
+main (int, char *[])
+{
+ ACE_Reactor reactor;
+ Event_Handler handler (reactor);
+
+ int result = 0;
+ while (result != -1)
+ result = reactor.handle_events ();
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_directory_changes.cpp b/examples/Reactor/WFMO_Reactor/test_directory_changes.cpp
new file mode 100644
index 00000000000..48c486380cb
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_directory_changes.cpp
@@ -0,0 +1,115 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_directory_changes.cpp
+//
+// = DESCRIPTION
+//
+// This application tests the working of WFMO_Reactor when users
+// are interested in monitoring changes in the filesystem.
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+
+ACE_RCSID(WFMO_Reactor, test_directory_changes, "$Id$")
+
+static int stop_test = 0;
+static const ACE_TCHAR *directory = ACE_TEXT (".");
+static const ACE_TCHAR *temp_file = ACE_TEXT ("foo");
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ Event_Handler (ACE_Reactor &reactor);
+ ~Event_Handler (void);
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+
+private:
+ ACE_HANDLE handle_;
+};
+
+Event_Handler::Event_Handler (ACE_Reactor &reactor)
+ : handle_ (ACE_INVALID_HANDLE)
+{
+ this->reactor (&reactor);
+
+ int change_notification_flags = FILE_NOTIFY_CHANGE_FILE_NAME;
+
+ this->handle_ = ACE_TEXT_FindFirstChangeNotification (directory, // pointer to name of directory to watch
+ FALSE, // flag for monitoring directory or directory tree
+ change_notification_flags // filter conditions to watch for
+ );
+ if (this->handle_ == ACE_INVALID_HANDLE)
+ ACE_ERROR ((LM_ERROR, "FindFirstChangeNotification could not be setup\n"));
+
+ if (this->reactor ()->register_handler (this,
+ this->handle_) != 0)
+ ACE_ERROR ((LM_ERROR, "Registration with Reactor could not be done\n"));
+}
+
+Event_Handler::~Event_Handler (void)
+{
+}
+
+int
+Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
+{
+ ::FindNextChangeNotification (this->handle_);
+ if (stop_test)
+ this->reactor ()->close ();
+ return 0;
+}
+
+int
+Event_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "Event_Handler removed from Reactor\n"));
+ ::FindCloseChangeNotification (this->handle_);
+ return 0;
+}
+
+void
+worker (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread creation\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread creating temporary file\n"));
+ ACE_HANDLE file = ACE_OS::open (temp_file, _O_CREAT | _O_EXCL);
+ if (file == ACE_INVALID_HANDLE)
+ ACE_ERROR ((LM_ERROR, "Error in creating %s: %p\n", temp_file, "ACE_OS::open"));
+ else
+ {
+ ACE_OS::close (file);
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n"));
+ ACE_OS::sleep (3);
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread removing temporary file\n"));
+ stop_test = 1;
+ ACE_OS::unlink (temp_file);
+ }
+}
+
+int
+main (int, char *[])
+{
+ ACE_Reactor reactor;
+ Event_Handler handler (reactor);
+
+ int result = ACE_OS::thr_create ((ACE_THR_FUNC) worker, 0, 0, 0);
+ ACE_ASSERT (result == 0);
+
+ for (result = 0; result != -1; result = reactor.handle_events ())
+ continue;
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_exceptions.cpp b/examples/Reactor/WFMO_Reactor/test_exceptions.cpp
new file mode 100644
index 00000000000..951959f39e1
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_exceptions.cpp
@@ -0,0 +1,97 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_exceptions.cpp
+//
+// = DESCRIPTION
+//
+// This test application tests the state of WFMO_Reactor when
+// exceptions occurs when executing user callbacks.
+//
+// The thread count in WFMO_Reactor is used to ensure that state of
+// WFMO_Reactor is not fouled up when exceptions occur in user code.
+// This example also shows how to write event loops that survive
+// user exceptions
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/WFMO_Reactor.h"
+
+ACE_RCSID(WFMO_Reactor, test_exceptions, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ Event_Handler (void)
+ : event_ (1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Event_Handler created\n"));
+ }
+
+ ~Event_Handler (void)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Event_Handler destroyed\n"));
+ }
+
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0)
+ {
+ char *cause_exception = 0;
+ char a = *cause_exception;
+ return 0;
+ }
+
+ ACE_HANDLE get_handle (void) const
+ {
+ return this->event_.handle ();
+ }
+private:
+ ACE_Manual_Event event_;
+};
+
+class ACE_WFMO_Reactor_Test
+{
+public:
+ static void doit (ACE_WFMO_Reactor &wfmo_reactor)
+ {
+ for (int i = 1; i <= 10; i++)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Active threads in WFMO_Reactor (before handle_events) = %d\n",
+ wfmo_reactor.active_threads_));
+ ACE_SEH_TRY
+ {
+ wfmo_reactor.handle_events ();
+ }
+ ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Exception occurred\n"));
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ "Active threads in WFMO_Reactor (after handle_events) = %d\n",
+ wfmo_reactor.active_threads_));
+ }
+ }
+};
+
+int
+main (int, char *[])
+{
+ Event_Handler handler;
+ ACE_WFMO_Reactor wfmo_reactor;
+ wfmo_reactor.register_handler (&handler);
+
+ ACE_WFMO_Reactor_Test::doit (wfmo_reactor);
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_handle_close.cpp b/examples/Reactor/WFMO_Reactor/test_handle_close.cpp
new file mode 100644
index 00000000000..5141262c2a8
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_handle_close.cpp
@@ -0,0 +1,306 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_handle_close.cpp
+//
+// = DESCRIPTION
+//
+// This application tests whether handle_close gets called and if
+// the correct masks are passed along. The handler should get
+// handle_close called for all three masks (READ, WRITE, and
+// EXCEPT).
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Get_Opt.h"
+#include "ace/Reactor.h"
+#include "ace/WFMO_Reactor.h"
+#include "ace/Select_Reactor.h"
+#include "ace/Auto_Ptr.h"
+#include "ace/Pipe.h"
+
+ACE_RCSID(WFMO_Reactor, test_handle_close, "$Id$")
+
+// Use the WFMO_Reactor
+static int opt_wfmo_reactor = 0;
+
+// Use the Select_Reactor
+static int opt_select_reactor = 0;
+
+// Make pipe readable in main()
+static int write_to_pipe_in_main = 0;
+
+// Cancel reads
+static int cancel_reads = 0;
+
+// Write some data to the pipe. This will cause handle_input to get
+// called.
+void
+write_to_pipe (ACE_Pipe &pipe)
+{
+ char *data = "hello";
+ int len = ACE_OS::strlen (data);
+
+ int result = ACE::send (pipe.write_handle (),
+ data,
+ len);
+ ACE_ASSERT (result == len);
+}
+
+// Simple handler class
+class Handler : public ACE_Event_Handler
+{
+public:
+ Handler (ACE_Pipe &pipe)
+ : pipe_ (pipe)
+ {
+ }
+
+ ACE_HANDLE get_handle (void) const
+ {
+ return this->pipe_.read_handle ();
+ }
+
+ int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Handler::handle_close called with mask = %d\n",
+ close_mask));
+ return 0;
+ }
+
+ int handle_input (ACE_HANDLE handle)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Handler::handle_input\n"));
+
+ // Remove for reading
+ return -1;
+ }
+
+ int handle_output (ACE_HANDLE handle)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Handler::handle_output\n"));
+
+ // Optionally cancel reads
+ if (cancel_reads)
+ {
+ int result = ACE_Reactor::instance ()->cancel_wakeup (this,
+ ACE_Event_Handler::READ_MASK);
+ ACE_ASSERT (result != -1);
+ }
+
+ // Write to the pipe; this causes handle_input to get called.
+ if (!write_to_pipe_in_main)
+ write_to_pipe (this->pipe_);
+
+ // Remove for writing
+ return -1;
+ }
+
+protected:
+ ACE_Pipe &pipe_;
+};
+
+class Different_Handler : public ACE_Event_Handler
+{
+public:
+
+ Different_Handler (ACE_Pipe &pipe)
+ : pipe_ (pipe)
+ {
+ }
+
+ ACE_HANDLE get_handle (void) const
+ {
+ return this->pipe_.read_handle ();
+ }
+
+ int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Different_Handler::handle_close called with mask = %d\n",
+ close_mask));
+ return 0;
+ }
+
+ int handle_input (ACE_HANDLE handle)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Different_Handler::handle_input\n"));
+
+ // Remove for reading
+ int result = ACE_Reactor::instance ()->remove_handler (this,
+ ACE_Event_Handler::READ_MASK);
+ ACE_ASSERT (result == 0);
+
+ return 0;
+ }
+
+ int handle_output (ACE_HANDLE handle)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Different_Handler::handle_output\n"));
+
+ // Add for reading
+ int result = ACE_Reactor::instance ()->mask_ops (this,
+ ACE_Event_Handler::READ_MASK,
+ ACE_Reactor::ADD_MASK);
+ ACE_ASSERT (result != -1);
+
+ ACE_Reactor_Mask old_masks =
+ ACE_Event_Handler::WRITE_MASK |
+ ACE_Event_Handler::EXCEPT_MASK;
+
+ ACE_ASSERT (old_masks ==
+ ACE_static_cast (ACE_Reactor_Mask, result));
+
+ // Get new masks
+ result = ACE_Reactor::instance ()->mask_ops (this,
+ ACE_Event_Handler::NULL_MASK,
+ ACE_Reactor::GET_MASK);
+ ACE_ASSERT (result != -1);
+
+ ACE_Reactor_Mask current_masks =
+ ACE_Event_Handler::READ_MASK |
+ ACE_Event_Handler::WRITE_MASK |
+ ACE_Event_Handler::EXCEPT_MASK;
+
+ ACE_ASSERT (current_masks ==
+ ACE_static_cast (ACE_Reactor_Mask, result));
+
+ // Remove for writing
+ ACE_Reactor_Mask mask = ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::DONT_CALL;
+ result = ACE_Reactor::instance ()->remove_handler (this,
+ mask);
+ ACE_ASSERT (result == 0);
+
+ // Write to the pipe; this causes handle_input to get called.
+ if (!write_to_pipe_in_main)
+ write_to_pipe (this->pipe_);
+
+ return 0;
+ }
+
+protected:
+ ACE_Pipe &pipe_;
+};
+
+
+//
+// Selection of which reactor should get created
+//
+void
+create_reactor (void)
+{
+ ACE_Reactor_Impl *impl = 0;
+
+ if (opt_wfmo_reactor)
+ {
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+ ACE_NEW (impl,
+ ACE_WFMO_Reactor);
+#endif /* ACE_WIN32 */
+ }
+ else if (opt_select_reactor)
+ ACE_NEW (impl,
+ ACE_Select_Reactor);
+
+ ACE_Reactor *reactor = 0;
+ ACE_NEW (reactor,
+ ACE_Reactor (impl));
+ ACE_Reactor::instance (reactor);
+}
+
+int
+main (int argc, char *argv[])
+{
+ int result = 0;
+
+ // Parse args
+ ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("swmc"), 1);
+ for (int c; (c = getopt ()) != -1; )
+ switch (c)
+ {
+ case 's':
+ opt_select_reactor = 1;
+ break;
+ case 'w':
+ opt_wfmo_reactor = 1;
+ break;
+ case 'm':
+ write_to_pipe_in_main = 1;
+ break;
+ case 'c':
+ cancel_reads = 1;
+ break;
+ }
+
+ // Create pipes
+ ACE_Pipe pipe1, pipe2;
+
+ result = pipe1.open ();
+ ACE_ASSERT (result == 0);
+
+ result = pipe2.open ();
+ ACE_ASSERT (result == 0);
+
+ // Create handlers
+ Handler handler (pipe1);
+ Different_Handler different_handler (pipe2);
+
+ // Create reactor
+ create_reactor ();
+
+ // Manage memory automagically.
+ auto_ptr<ACE_Reactor> reactor (ACE_Reactor::instance ());
+ auto_ptr<ACE_Reactor_Impl> impl;
+
+ // If we are using other that the default implementation, we must
+ // clean up.
+ if (opt_select_reactor || opt_wfmo_reactor)
+ impl = auto_ptr<ACE_Reactor_Impl> (ACE_Reactor::instance ()->implementation ());
+
+ // Register handlers
+ ACE_Reactor_Mask handler_mask =
+ ACE_Event_Handler::READ_MASK |
+ ACE_Event_Handler::WRITE_MASK |
+ ACE_Event_Handler::EXCEPT_MASK;
+
+ ACE_Reactor_Mask different_handler_mask =
+ ACE_Event_Handler::WRITE_MASK |
+ ACE_Event_Handler::EXCEPT_MASK;
+
+ result = ACE_Reactor::instance ()->register_handler (&handler,
+ handler_mask);
+ ACE_ASSERT (result == 0);
+
+ result = ACE_Reactor::instance ()->register_handler (&different_handler,
+ different_handler_mask);
+ ACE_ASSERT (result == 0);
+
+ // Write to the pipe; this causes handle_input to get called.
+ if (write_to_pipe_in_main)
+ {
+ write_to_pipe (pipe1);
+ write_to_pipe (pipe2);
+ }
+
+ // Note that handle_output will get called automatically since the
+ // pipe is writable!
+
+ // Run for three seconds
+ ACE_Time_Value time (3);
+ ACE_Reactor::instance ()->run_event_loop (time);
+
+ ACE_DEBUG ((LM_DEBUG, "\nClosing down the application\n\n"));
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_multithreading.cpp b/examples/Reactor/WFMO_Reactor/test_multithreading.cpp
new file mode 100644
index 00000000000..ee368d18c11
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_multithreading.cpp
@@ -0,0 +1,245 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_multithreading.cpp
+//
+// = DESCRIPTION
+//
+// This application tests multiple threads simultaneously calling
+// Reactor::handle_events(). It also shows how different threads
+// can update the state of Reactor by registering and removing
+// Event_Handlers.
+//
+// Note that this test will only work with WFMO_Reactor
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Task.h"
+#include "ace/Reactor.h"
+#include "ace/WFMO_Reactor.h"
+#include "ace/Get_Opt.h"
+
+ACE_RCSID(WFMO_Reactor, test_multithreading, "$Id$")
+
+static int concurrent_threads = 1;
+static int number_of_handles = ACE_Reactor::instance ()->size ();
+static int number_of_handles_to_signal = 1;
+static int interval = 2;
+static int iterations = 10;
+
+// Explain usage and exit.
+static void
+print_usage_and_die (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "usage: \n\t"
+ "[-t (# of threads - default 1)] \n\t"
+ "[-h (# of handlers) - default 62] \n\t"
+ "[-i (# time interval between signals) - default 2] \n\t"
+ "[-s (# of handles to signal) - default 1] \n\t"
+ "[-e (# of iterations) - default 10] \n\t"));
+ ACE_OS::exit (1);
+}
+
+// Parse the command-line arguments and set options.
+static void
+parse_args (int argc, char **argv)
+{
+ ACE_Get_Opt get_opt (argc, argv, "t:h:s:i:e:");
+ int c;
+
+ while ((c = get_opt ()) != -1)
+ switch (c)
+ {
+ case 't':
+ concurrent_threads = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'e':
+ iterations = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'h':
+ number_of_handles = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'i':
+ interval = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 's':
+ number_of_handles_to_signal = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ default:
+ print_usage_and_die ();
+ break;
+ }
+}
+
+class Task_Handler : public ACE_Task<ACE_NULL_SYNCH>
+{
+public:
+ Task_Handler (size_t number_of_handles,
+ size_t concurrent_threads);
+ // Constructor.
+
+ ~Task_Handler (void);
+ // Destructor.
+
+ virtual int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+ // Called when object is removed from the ACE_Reactor
+
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ // Handle events being signaled by the main thread.
+
+ virtual int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg = 0);
+ // Called when timer expires.
+
+ int svc (void);
+ // Task event loop.
+
+ int signal (size_t index);
+ // Signal an event.
+
+private:
+ ACE_Auto_Event *events_;
+};
+
+// All threads do reactor->handle_events ()
+int
+Task_Handler::svc (void)
+{
+ // Try to become the owner
+ ACE_Reactor::instance ()->owner (ACE_Thread::self ());
+ // Run the event loop.
+ return ACE_Reactor::run_event_loop ();
+}
+
+Task_Handler::Task_Handler (size_t number_of_handles,
+ size_t concurrent_threads)
+{
+ ACE_NEW (this->events_, ACE_Auto_Event [number_of_handles]);
+
+ for (size_t i = 1; i <= number_of_handles; i++)
+ if (ACE_Reactor::instance ()->register_handler (this,
+ this->events_[i].handle ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\t cannot register handle %d with Reactor\n",
+ "Task_Handler::Task_Handler",
+ i));
+
+ // Make us an active object.
+ if (this->activate (THR_NEW_LWP,
+ concurrent_threads) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\t cannot activate task\n",
+ "activate"));
+}
+
+Task_Handler::~Task_Handler (void)
+{
+ delete [] this->events_;
+}
+
+
+int
+Task_Handler::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *)
+{
+ // When signaled, print message, remove self, and add self
+ // This will force Reactor to update its internal handle tables
+
+ if (ACE_Reactor::instance ()->remove_handler (siginfo->si_handle_,
+ ACE_Event_Handler::DONT_CALL) == -1)
+ return -1;
+ // ACE_ERROR_RETURN ((LM_ERROR,
+ // "(%t) %p\tTask cannot be unregistered from Reactor: handle value = %d\n",
+ // "Task_Handler::handle_signal",
+ // siginfo->si_handle_), -1);
+
+ if (ACE_Reactor::instance ()->register_handler (this,
+ siginfo->si_handle_) == -1)
+ return -1;
+ // ACE_ERROR_RETURN ((LM_ERROR,
+ // "(%t) %p\tTask cannot be registered with Reactor: handle value = %d\n",
+ // "Task_Handler::handle_signal",
+ // siginfo->si_handle_), -1);
+ return 0;
+}
+
+int
+Task_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) handle_close() called: handle value = %d\n",
+ handle));
+ return 0;
+}
+
+int
+Task_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) handle_timeout() called: iteration value = %d\n",
+ int (arg)));
+ return 0;
+}
+
+int
+Task_Handler::signal (size_t index)
+{
+ return this->events_[index].signal ();
+}
+
+int
+main (int argc, char **argv)
+{
+ parse_args (argc, argv);
+ Task_Handler task (number_of_handles,
+ concurrent_threads);
+
+ ACE_OS::srand (ACE_OS::time (0L));
+
+ for (int i = 1; i <= iterations; i++)
+ {
+ // Sleep for a while
+ ACE_OS::sleep (interval);
+
+ // Randomly generate events
+ ACE_DEBUG ((LM_DEBUG, "********************************************************\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t -- main thread) signaling %d events : iteration = %d\n",
+ number_of_handles_to_signal,
+ i));
+ ACE_DEBUG ((LM_DEBUG, "********************************************************\n"));
+
+ // Setup a timer for the task
+ if (ACE_Reactor::instance ()->schedule_timer (&task,
+ (void *) i,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ for (int i = 0; i < number_of_handles_to_signal; i++)
+ // Randomly select a handle to signal.
+ task.signal (ACE_OS::rand() % number_of_handles);
+ }
+
+ // Sleep for a while
+ ACE_OS::sleep (interval);
+
+ // End the Reactor event loop
+ ACE_Reactor::end_event_loop ();
+
+ // Wait for all threads to exit
+ ACE_Thread_Manager::instance ()->wait ();
+
+ // Close the Reactor singleton before exiting this function.
+ // If we wait for the Object Manager to do this, it will be too
+ // late since Task_Handler instance would have disappeared.
+ ACE_Reactor::close_singleton ();
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_network_events.cpp b/examples/Reactor/WFMO_Reactor/test_network_events.cpp
new file mode 100644
index 00000000000..3f3c7fdcd20
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_network_events.cpp
@@ -0,0 +1,203 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_network_events.cpp
+//
+// = DESCRIPTION
+//
+// This application tests Reactor to make sure that it responds
+// correctly to different kinds of network events.
+//
+// The test starts off by creating a Network_Listener, that listens
+// for connections at ACE_DEFAULT_SERVER_PORT. When a client
+// connects, a Network_Handler is created. Network_Handler reads
+// messages off the socket and prints them out. This is done until
+// the remote side shuts down. Multiple clients can connect at the
+// same time.
+//
+// Events tested in this example includes ACCEPT, READ, and CLOSE masks.
+//
+// To run this example, start an instance of this example and
+// connect to it using telnet (to port
+// ACE_DEFAULT_SERVER_PORT(10002)).
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+#include "ace/WFMO_Reactor.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/SOCK_Acceptor.h"
+
+ACE_RCSID(WFMO_Reactor, test_network_events, "$Id$")
+
+class Network_Handler : public ACE_Event_Handler
+{
+public:
+ Network_Handler (ACE_SOCK_Stream &s);
+ // Default constructor
+
+ virtual int handle_input (ACE_HANDLE handle);
+ virtual int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+ virtual ACE_HANDLE get_handle (void) const;
+
+ ACE_SOCK_Stream stream_;
+
+};
+
+Network_Handler::Network_Handler (ACE_SOCK_Stream &s)
+ : stream_ (s)
+{
+ this->reactor (ACE_Reactor::instance ());
+
+ int result = this->reactor ()->register_handler (this, READ_MASK);
+ ACE_ASSERT (result == 0);
+}
+
+ACE_HANDLE
+Network_Handler::get_handle (void) const
+{
+ return this->stream_.get_handle ();
+}
+
+int
+Network_Handler::handle_input (ACE_HANDLE handle)
+{
+ ACE_DEBUG ((LM_DEBUG, "Network_Handler::handle_input handle = %d\n", handle));
+
+ while (1)
+ {
+ char message[BUFSIZ];
+ int result = this->stream_.recv (message, sizeof message);
+ if (result > 0)
+ {
+ message[result] = 0;
+ ACE_DEBUG ((LM_DEBUG, "Remote message: %s\n", message));
+ }
+ else if (result == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Connection closed\n"));
+ return -1;
+ }
+ else if (errno == EWOULDBLOCK)
+ {
+ return 0;
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "Problems in receiving data, result = %d", result));
+ return -1;
+ }
+ }
+}
+
+int
+Network_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "Network_Handler::handle_close handle = %d\n", handle));
+
+ this->stream_.close ();
+ delete this;
+ return 0;
+}
+
+class Network_Listener : public ACE_Event_Handler
+{
+public:
+ Network_Listener (void);
+ // Default constructor
+ ~Network_Listener (void);
+ // Default constructor
+
+ virtual int handle_input (ACE_HANDLE handle);
+ virtual int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+ ACE_HANDLE get_handle (void) const;
+
+ ACE_INET_Addr local_address_;
+ ACE_SOCK_Acceptor acceptor_;
+};
+
+Network_Listener::Network_Listener (void)
+ : local_address_ (ACE_DEFAULT_SERVER_PORT),
+ acceptor_ (local_address_, 1)
+{
+ this->reactor (ACE_Reactor::instance ());
+ int result = this->reactor ()->register_handler (this,
+ ACE_Event_Handler::ACCEPT_MASK);
+ ACE_ASSERT (result == 0);
+}
+
+Network_Listener::~Network_Listener (void)
+{
+ this->reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK ||
+ ACE_Event_Handler::DONT_CALL);
+ this->handle_close (this->get_handle (), ACE_Event_Handler::ALL_EVENTS_MASK);
+}
+
+ACE_HANDLE
+Network_Listener::get_handle (void) const
+{
+ return this->acceptor_.get_handle ();
+}
+
+int
+Network_Listener::handle_input (ACE_HANDLE handle)
+{
+ ACE_DEBUG ((LM_DEBUG, "Network_Listener::handle_input handle = %d\n", handle));
+
+ ACE_INET_Addr remote_address;
+ ACE_SOCK_Stream stream;
+
+ // Try to find out if the implementation of the reactor that we are
+ // using requires us to reset the event association for the newly
+ // created handle. This is because the newly created handle will
+ // inherit the properties of the listen handle, including its event
+ // associations.
+ int reset_new_handle = this->reactor ()->uses_event_associations ();
+
+ int result = this->acceptor_.accept (stream, // stream
+ &remote_address, // remote address
+ 0, // timeout
+ 1, // restart
+ reset_new_handle); // reset new handler
+ ACE_ASSERT (result == 0);
+
+ ACE_DEBUG ((LM_DEBUG, "Remote connection from: "));
+ remote_address.dump ();
+
+ Network_Handler *handler;
+ ACE_NEW_RETURN (handler, Network_Handler (stream), -1);
+
+ return 0;
+}
+
+int
+Network_Listener::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "Network_Listener::handle_close handle = %d\n", handle));
+
+ this->acceptor_.close ();
+ return 0;
+}
+
+int
+main (int, char *[])
+{
+ Network_Listener listener;
+
+ ACE_Reactor::run_event_loop ();
+
+ return 0;
+};
diff --git a/examples/Reactor/WFMO_Reactor/test_prerun_state_changes.cpp b/examples/Reactor/WFMO_Reactor/test_prerun_state_changes.cpp
new file mode 100644
index 00000000000..fb880b8c8ee
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_prerun_state_changes.cpp
@@ -0,0 +1,64 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_prerun_state_changes.cpp
+//
+// = DESCRIPTION
+//
+// Tests the Reactor's ability to handle state changes before
+// getting a chance to run.
+//
+// = AUTHOR
+//
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+
+ACE_RCSID(WFMO_Reactor, test_prerun_state_changes, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+// = TITLE
+// Generic Event Handler.
+//
+{
+public:
+ virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask mask)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "event handler %d closed.\n",
+ (int) handle));
+ delete this;
+ return 0;
+ }
+};
+
+int
+main (int argc, char *argv[])
+{
+ ACE_HANDLE handle = (ACE_HANDLE) ::socket (PF_INET, SOCK_STREAM, 0);
+
+ Event_Handler *event_handler = new Event_Handler;
+
+ int result = ACE_Reactor::instance ()->register_handler (handle,
+ event_handler,
+ ACE_Event_Handler::READ_MASK);
+ ACE_ASSERT (result == 0);
+
+ result = ACE_Reactor::instance ()->register_handler (handle,
+ event_handler,
+ ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::QOS_MASK);
+ ACE_ASSERT (result == 0);
+
+ result = ACE_Reactor::instance ()->remove_handler (handle,
+ ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
+ ACE_ASSERT (result == 0);
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_registration.cpp b/examples/Reactor/WFMO_Reactor/test_registration.cpp
new file mode 100644
index 00000000000..6409efc2749
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_registration.cpp
@@ -0,0 +1,155 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_registration.cpp
+//
+// = DESCRIPTION
+//
+// This test application tests a wide range of registration,
+// suspension, resumption, and removal of events from Reactor.
+//
+// The application initially registers two events with Reactor. A
+// auxiliary thread is created to do the signaling on the
+// events. When the first event is signaled, the event is suspended
+// from Reactor. The event is then signaled again, but is "lost"
+// since the handler has been suspended. When the second event is
+// signal, the first event is resumed and the second is
+// suspended. When the first event is signaled again, both events
+// are removed from Reactor.
+//
+// This test shows off the following features of Reactor:
+// - Registration
+// - Suspension
+// - Resumption
+// - Removal (while active and while suspended)
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+
+ACE_RCSID(WFMO_Reactor, test_registration, "$Id$")
+
+// Globals for this test
+int stop_test = 0;
+ACE_Reactor reactor;
+
+
+class Simple_Handler : public ACE_Event_Handler
+{
+public:
+ Simple_Handler (void);
+ // Default constructor
+
+ virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ virtual int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+
+ ACE_Auto_Event event1_;
+ ACE_Auto_Event event2_;
+ int handle_signal_count_;
+ int handle_close_count_;
+};
+
+Simple_Handler::Simple_Handler (void)
+ : handle_signal_count_ (0),
+ handle_close_count_ (0)
+{
+}
+
+int
+Simple_Handler::handle_signal (int signum, siginfo_t *s, ucontext_t *)
+{
+ ACE_HANDLE handle = s->si_handle_;
+
+ this->handle_signal_count_++;
+
+ if (this->handle_signal_count_ == 1)
+ this->reactor ()->suspend_handler (event1_.handle ());
+ else if (this->handle_signal_count_ == 2)
+ {
+ this->reactor ()->resume_handler (event1_.handle ());
+ this->reactor ()->suspend_handler (event2_.handle ());
+ }
+ else if (this->handle_signal_count_ == 3)
+ {
+ this->reactor ()->remove_handler (event1_.handle (),
+ ACE_Event_Handler::NULL_MASK);
+ this->reactor ()->remove_handler (event2_.handle (),
+ ACE_Event_Handler::NULL_MASK);
+ }
+ return 0;
+}
+
+int
+Simple_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "Simple_Handler::handle_close handle = %d\n", handle));
+ this->handle_close_count_++;
+
+ if (this->handle_close_count_ == 1)
+ stop_test = 0;
+ else if (this->handle_close_count_ == 2)
+ stop_test = 1;
+
+ return 0;
+}
+
+// Globals for this test
+Simple_Handler simple_handler;
+
+void
+worker (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread creation\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n"));
+ ACE_OS::sleep (1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event1_.handle()));
+ simple_handler.event1_.signal ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n"));
+ ACE_OS::sleep (1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event1_.handle()));
+ ACE_DEBUG ((LM_DEBUG, "Note: This signal should be \"lost\" because of the suspended handler\n"));
+ simple_handler.event1_.signal ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n"));
+ ACE_OS::sleep (1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread resetting %d\n", simple_handler.event1_.handle()));
+ simple_handler.event1_.reset ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event2_.handle()));
+ simple_handler.event2_.signal ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n"));
+ ACE_OS::sleep (1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event1_.handle()));
+ simple_handler.event1_.signal ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread death\n"));
+}
+
+int
+main (int, char *[])
+{
+ int result = reactor.register_handler (&simple_handler,
+ simple_handler.event1_.handle ());
+ ACE_ASSERT (result == 0);
+
+ result = reactor.register_handler (&simple_handler,
+ simple_handler.event2_.handle ());
+ ACE_ASSERT (result == 0);
+
+ result = ACE_OS::thr_create ((ACE_THR_FUNC) worker, 0, 0, 0);
+ ACE_ASSERT (result == 0);
+
+ result = 0;
+ while (!stop_test && result != -1)
+ {
+ result = reactor.handle_events ();
+ }
+ return 0;
+};
diff --git a/examples/Reactor/WFMO_Reactor/test_registry_changes.cpp b/examples/Reactor/WFMO_Reactor/test_registry_changes.cpp
new file mode 100644
index 00000000000..1dd25680a1f
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_registry_changes.cpp
@@ -0,0 +1,133 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_registry_changes.cpp
+//
+// = DESCRIPTION
+//
+// This application tests the working of Reactor when users are
+// interested in monitoring changes in the registry.
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+#include "ace/Registry.h"
+
+ACE_RCSID(WFMO_Reactor, test_registry_changes, "$Id$")
+
+static int stop_test = 0;
+static HKEY context_to_monitor = HKEY_CURRENT_USER;
+static const ACE_TCHAR *temp_context_name = ACE_TEXT ("ACE temporary context");
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ Event_Handler (ACE_Reactor &reactor);
+ ~Event_Handler (void);
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+ ACE_Registry::Naming_Context &context (void);
+
+private:
+ ACE_Auto_Event event_;
+ ACE_Registry::Naming_Context context_;
+};
+
+Event_Handler::Event_Handler (ACE_Reactor &reactor)
+ : context_ (context_to_monitor)
+{
+ this->reactor (&reactor);
+
+ if (::RegNotifyChangeKeyValue (this->context_.key (), // handle of key to watch
+ FALSE, // flag for subkey notification
+ REG_NOTIFY_CHANGE_NAME, // changes to be reported
+ this->event_.handle (), // handle of signaled event
+ TRUE // flag for asynchronous reporting
+ ) != ERROR_SUCCESS)
+ ACE_ERROR ((LM_ERROR, "RegNotifyChangeKeyValue could not be setup\n"));
+
+ if (this->reactor ()->register_handler (this,
+ this->event_.handle ()) != 0)
+ ACE_ERROR ((LM_ERROR, "Registration with Reactor could not be done\n"));
+}
+
+Event_Handler::~Event_Handler (void)
+{
+}
+
+int
+Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
+{
+ if (stop_test)
+ this->reactor ()->close ();
+ else if (::RegNotifyChangeKeyValue (this->context_.key (), // handle of key to watch
+ FALSE, // flag for subkey notification
+ REG_NOTIFY_CHANGE_NAME, // changes to be reported
+ this->event_.handle (), // handle of signaled event
+ TRUE // flag for asynchronous reporting
+ ) != ERROR_SUCCESS)
+ ACE_ERROR ((LM_ERROR,
+ "RegNotifyChangeKeyValue could not be setup\n"));
+ return 0;
+}
+
+int
+Event_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "Event_Handler removed from Reactor\n"));
+ return 0;
+}
+
+ACE_Registry::Naming_Context &
+Event_Handler::context (void)
+{
+ return this->context_;
+}
+
+void
+worker (Event_Handler *event_handler)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread creation\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread creating temporary registry entry\n"));
+
+ ACE_Registry::Naming_Context temp_context;
+ int result = event_handler->context ().bind_new_context (temp_context_name,
+ temp_context);
+
+ if (result == -1)
+ ACE_ERROR ((LM_ERROR, "Error in creating %s: %p\n", temp_context_name, "bind_new_context"));
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n"));
+ ACE_OS::sleep (3);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) Thread removing registry entry\n"));
+ stop_test = 1;
+ event_handler->context ().unbind_context (temp_context_name);
+ }
+}
+
+int
+main (int, char *[])
+{
+ ACE_Reactor reactor;
+ Event_Handler handler (reactor);
+
+ int result = ACE_OS::thr_create ((ACE_THR_FUNC) worker, &handler, 0, 0);
+ ACE_ASSERT (result == 0);
+
+ for (result = 0; result != -1; result = reactor.handle_events ())
+ continue;
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_removals.cpp b/examples/Reactor/WFMO_Reactor/test_removals.cpp
new file mode 100644
index 00000000000..1c4b4acf687
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_removals.cpp
@@ -0,0 +1,103 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_removals.cpp
+//
+// = DESCRIPTION
+//
+// Tests the Reactor's ability to handle simultaneous events. If
+// you pass anything on the command-line, then each handler
+// requests to be removed from the Reactor after each event.
+//
+// = AUTHOR
+// Tim Harrison
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+#include "ace/Service_Config.h"
+#include "ace/Synch.h"
+
+ACE_RCSID(WFMO_Reactor, test_removals, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+// = TITLE
+// Generic Event Handler.
+//
+// = DESCRIPTION
+//
+// Creates event. Registers with Reactor. Signals event. If
+// created with -close_down- it returns -1 from handle signal.
+{
+public:
+ Event_Handler (int event_number,
+ int close_down)
+ : event_number_ (event_number),
+ close_down_ (close_down)
+ {
+ if (ACE_Reactor::instance ()->register_handler (this,
+ this->event_.handle ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\tevent handler %d cannot be added to Reactor\n", "", event_number_));
+ this->event_.signal ();
+ }
+
+ virtual int handle_signal (int index, siginfo_t *, ucontext_t *)
+ {
+ if (this->close_down_)
+ return -1;
+ else
+ return 0;
+ }
+
+ virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+ {
+ ACE_DEBUG ((LM_DEBUG, "event handler %d closed.\n", event_number_));
+ delete this;
+ return 0;
+ }
+
+ virtual ACE_HANDLE get_handle (void) const
+ {
+ return event_.handle ();
+ }
+
+private:
+ int event_number_;
+ // Our event number.
+
+ int close_down_;
+ // Shall we close down or not.
+
+ ACE_Event event_;
+ // Signaled to shut down the handler.
+};
+
+int
+main (int argc, char *argv[])
+{
+ int close_down = argc > 1 ? 1 : 0;
+
+ for (size_t i = 1; i <= ACE_Reactor::instance ()->size (); i++)
+ new Event_Handler (i, close_down);
+
+ int result = 0;
+ ACE_Time_Value time (1);
+ while (1)
+ {
+ result = ACE_Reactor::instance ()->handle_events (time);
+ if (result == 0 && errno == ETIME)
+ {
+ ACE_DEBUG ((LM_DEBUG, "No more work left: timing out\n"));
+ break;
+ }
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
+ }
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_suspended_removals.cpp b/examples/Reactor/WFMO_Reactor/test_suspended_removals.cpp
new file mode 100644
index 00000000000..4dc7cc33bb7
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_suspended_removals.cpp
@@ -0,0 +1,163 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_suspended_removals.cpp
+//
+// = DESCRIPTION
+//
+// Tests the Reactor's ability to handle removal of suspended
+// handles.
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+#include "ace/WFMO_Reactor.h"
+#include "ace/Synch.h"
+
+ACE_RCSID(WFMO_Reactor, test_suspended_removals, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+
+ ACE_HANDLE get_handle (void) const
+ {
+ return this->event_.handle ();
+ }
+
+ ACE_Event event_;
+};
+
+class ACE_WFMO_Reactor_Test
+{
+public:
+ static void check_for_valid_state (ACE_WFMO_Reactor &wfmo_reactor,
+ size_t handles_to_be_added,
+ size_t handles_to_be_suspended,
+ size_t handles_to_be_resumed,
+ size_t handles_to_be_deleted)
+ {
+ ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_added_ == handles_to_be_added);
+ ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_suspended_ == handles_to_be_suspended);
+ ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_resumed_ == handles_to_be_resumed);
+ ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_deleted_ == handles_to_be_deleted);
+ }
+};
+
+int
+main (int argc, char *argv[])
+{
+ Event_Handler handler;
+ ACE_WFMO_Reactor reactor;
+ ACE_Reactor base_reactor (&reactor);
+ ACE_Time_Value time (1);
+
+ int result =
+ reactor.register_handler (&handler);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 1, 0, 0, 0);
+
+ result =
+ reactor.remove_handler (&handler,
+ ACE_Event_Handler::DONT_CALL |
+ ACE_Event_Handler::ALL_EVENTS_MASK);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 1, 0, 0, 1);
+
+ result = base_reactor.run_reactor_event_loop (time);
+ ACE_ASSERT (result != -1);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 0, 0);
+
+ result =
+ reactor.register_handler (&handler);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 1, 0, 0, 0);
+
+ result = base_reactor.run_reactor_event_loop (time);
+ ACE_ASSERT (result != -1);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 0, 0);
+
+ result =
+ reactor.suspend_handler (&handler);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 1, 0, 0);
+
+ result =
+ reactor.remove_handler (&handler,
+ ACE_Event_Handler::DONT_CALL |
+ ACE_Event_Handler::ALL_EVENTS_MASK);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 0, 1);
+
+ result = base_reactor.run_reactor_event_loop (time);
+ ACE_ASSERT (result != -1);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 0, 0);
+
+ result =
+ reactor.register_handler (&handler);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 1, 0, 0, 0);
+
+ result =
+ reactor.suspend_handler (&handler);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 1, 1, 0, 0);
+
+ result = base_reactor.run_reactor_event_loop (time);
+ ACE_ASSERT (result != -1);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 0, 0);
+
+ result =
+ reactor.resume_handler (&handler);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 1, 0);
+
+ result =
+ reactor.remove_handler (&handler,
+ ACE_Event_Handler::DONT_CALL |
+ ACE_Event_Handler::ALL_EVENTS_MASK);
+ ACE_ASSERT (result == 0);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 0, 1);
+
+ result = base_reactor.run_reactor_event_loop (time);
+ ACE_ASSERT (result != -1);
+
+ ACE_WFMO_Reactor_Test::check_for_valid_state (reactor,
+ 0, 0, 0, 0);
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_talker.cpp b/examples/Reactor/WFMO_Reactor/test_talker.cpp
new file mode 100644
index 00000000000..84ccb8a78b6
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_talker.cpp
@@ -0,0 +1,559 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_talker.cpp
+//
+// = DESCRIPTION
+//
+// This test application tests a wide range of events that can be
+// demultiplexed using various ACE utilities. Events used include
+// ^C events, reading from STDIN, vanilla Win32 events, thread
+// exits, Reactor notifications, proactive reads, and proactive
+// writes.
+//
+// The proactive I/O events are demultiplexed by the ACE_Proactor.
+// The thread exits, notications, and vanilla Win32 events are
+// demultiplexed by the ACE_Reactor. To enable a single thread
+// to run all these events, the Proactor is integrated with the
+// Reactor.
+//
+// The test application prototypes a simple talk program. Two
+// instances of the application connect. Input from either console
+// is displayed on the others console also. Because of the evils
+// of Win32 STDIN, a separate thread is used to read from STDIN.
+// To test the Proactor and Reactor, I/O between the remote
+// processes is performed proactively and interactions between the
+// STDIN thread and the main thread are performed reactively.
+//
+// The following description of the test application is in two
+// parts. The participants section explains the main components
+// involved in the application. The collaboration section
+// describes how the partipants interact in response to the
+// multiple event types which occur.
+//
+// The Reactor test application has the following participants:
+//
+// . Reactor -- The Reactor demultiplexes Win32 "waitable"
+// events using WaitForMultipleObjects.
+//
+// . Proactor -- The proactor initiates and demultiplexes
+// overlapped I/O operations. The Proactor registers with the
+// Reactor so that a single-thread can demultiplex all
+// application events.
+//
+// . STDIN_Handler -- STDIN_Handler is an Active Object which reads
+// from STDIN and forwards the input to the Peer_Handler. This
+// runs in a separate thread to make the test more interesting.
+// However, STDIN is "waitable", so in general it can be waited on
+// by the ACE Reactor, thanks MicroSlush!
+//
+// . Peer_Handler -- The Peer_Handler connects to another instance
+// of test_reactor. It Proactively reads and writes data to the
+// peer. When the STDIN_Handler gives it messages, it fowards them
+// to the remote peer. When it receives messages from the remote
+// peer, it prints the output to the console.
+//
+// The collaborations of the participants are as follows:
+//
+// . Initialization
+//
+// Peer_Handler -- connects to the remote peer. It then begins
+// proactively reading from the remote connection. Note that it
+// will be notified by the Proactor when a read completes. It
+// also registers a notification strategy with message queue so
+// that it is notified when the STDIN_Handler posts a message
+// onto the queue.
+//
+// STDIN_Handler -- STDIN_Handler registers a signal handler for
+// SIGINT. This just captures the exception so that the kernel
+// doesn't kill our process; We want to exit gracefully. It also
+// creates an Exit_Hook object which registers the
+// STDIN_Handler's thread handle with the Reactor. The
+// Exit_Hook will get called back when the STDIN_Handler thread
+// exits. After registering these, it blocks reading from STDIN.
+//
+// Proactor -- is registered with the Reactor.
+//
+// The main thread of control waits in the Reactor.
+//
+// . STDIN events -- When the STDIN_Handler thread reads from
+// STDIN, it puts the message on Peer_Handler's message queue. It
+// then returns to reading from STDIN.
+//
+// . Message enqueue -- The Reactor thread wakes up and calls
+// Peer_Handler::handle_output. The Peer_Handler then tries to
+// dequeue a message from its message queue. If it can, the
+// message is Proactively sent to the remote peer. Note that the
+// Peer_Handler will be notified with this operation is complete.
+// The Peer_Handler then falls back into the Reactor event loop.
+//
+// . Send complete event -- When a proactive send is complete, the
+// Proactor is notified by the Reactor. The Proactor, in turn,
+// notifies the Peer_Handler. The Peer_Handler then checks for
+// more messages from the message queue. If there are any, it
+// tries to send them. If there are not, it returns to the
+// Reactor event loop.
+//
+// . Read complete event -- When a proactive read is complete (the
+// Peer_Handler initiated a proactive read when it connected to the
+// remote peer), the Proactor is notified by the Reactor. The
+// Proactor, in turn notifies the Peer_Handler. If the read was
+// successful the Peer_Handler just displays the received msg to
+// the console and reinvokes a proactive read from the network
+// connection. If the read failed (i.e. the remote peer exited),
+// the Peer_Handler sets a flag to end the event loop and returns.
+// This will cause the application to exit.
+//
+// . ^C events -- When the user types ^C at the console, the
+// STDIN_Handler's signal handler will be called. It does nothing,
+// but as a result of the signal, the STDIN_Handler thread will
+// exit.
+//
+// . STDIN_Handler thread exits -- The Exit_Hook will get called
+// back from the Reactor. Exit_Hook::handle_signal sets a flag
+// to end the event loop and returns. This will cause the
+// application to exit.
+//
+//
+// To run example, start an instance of the test with an optional
+// local port argument (as the acceptor). Start the other instance
+// with -h <hostname> and -p <server port>. Type in either the
+// client or server windows and your message should show up in the
+// other window. Control C to exit.
+//
+// = AUTHOR
+// Tim Harrison
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+#include "ace/Reactor_Notification_Strategy.h"
+#include "ace/WIN32_Proactor.h"
+#include "ace/Proactor.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/Get_Opt.h"
+#include "ace/Service_Config.h"
+#include "ace/Synch.h"
+#include "ace/Task.h"
+
+ACE_RCSID(WFMO_Reactor, test_talker, "$Id$")
+
+typedef ACE_Task<ACE_MT_SYNCH> MT_TASK;
+
+class Peer_Handler : public MT_TASK, public ACE_Handler
+// = TITLE
+// Connect to a server. Receive messages from STDIN_Handler
+// and forward them to the server using proactive I/O.
+{
+public:
+ // = Initialization methods.
+ Peer_Handler (int argc, char *argv[]);
+ ~Peer_Handler (void);
+
+ int open (void * =0);
+ // This method creates the network connection to the remote peer.
+ // It does blocking connects and accepts depending on whether a
+ // hostname was specified from the command line.
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This method will be called when an asynchronous read completes on a stream.
+ // The remote peer has sent us something. If it succeeded, print
+ // out the message and reinitiate a read. Otherwise, fail. In both
+ // cases, delete the message sent.
+
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This method will be called when an asynchronous write completes on a strea_m.
+ // One of our asynchronous writes to the remote peer has completed.
+ // Make sure it succeeded and then delete the message.
+
+ virtual ACE_HANDLE handle (void) const;
+ // Get the I/O handle used by this <handler>. This method will be
+ // called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is
+ // passed to <open>.
+
+ virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
+ // We've been removed from the Reactor.
+
+ virtual int handle_output (ACE_HANDLE fd);
+ // Called when output events should start. Note that this is
+ // automatically invoked by the
+ // <ACE_Reactor_Notificiation_Strategy>.
+
+private:
+ ACE_SOCK_Stream stream_;
+ // Socket that we have connected to the server.
+
+ ACE_Reactor_Notification_Strategy strategy_;
+ // The strategy object that the reactor uses to notify us when
+ // something is added to the queue.
+
+ // = Remote peer info.
+ char *host_;
+ // Name of remote host.
+
+ u_short port_;
+ // Port number for remote host.
+
+ ACE_Asynch_Read_Stream rd_stream_;
+ // Read stream
+
+ ACE_Asynch_Write_Stream wr_stream_;
+ // Write stream
+
+ ACE_Message_Block mb_;
+ // Message Block for reading from the network
+};
+
+class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH>
+// = TITLE
+// Active Object. Reads from STDIN and passes message blocks to
+// the peer handler.
+{
+public:
+ STDIN_Handler (MT_TASK &ph);
+ // Initialization.
+
+ virtual int open (void * = 0);
+ // Activate object.
+
+ virtual int close (u_long = 0);
+ // Shut down.
+
+ int svc (void);
+ // Thread runs here as an active object.
+
+private:
+ static void handler (int signum);
+ // Handle a ^C. (Do nothing, this just illustrates how we can catch
+ // signals along with the other things).
+
+ void register_thread_exit_hook (void);
+ // Helper function to register with the Reactor for thread exit.
+
+ virtual int handle_signal (int index, siginfo_t *, ucontext_t *);
+ // The STDIN thread has exited. This means the user hit ^C. We can
+ // end the event loop.
+
+ MT_TASK &ph_;
+ // Send all input to ph_.
+
+ ACE_HANDLE thr_handle_;
+ // Handle of our thread.
+};
+
+Peer_Handler::Peer_Handler (int argc, char *argv[])
+ : host_ (0),
+ port_ (ACE_DEFAULT_SERVER_PORT),
+ strategy_ (ACE_Reactor::instance (),
+ this,
+ ACE_Event_Handler::WRITE_MASK),
+ mb_ (BUFSIZ)
+{
+ // This code sets up the message to notify us when a new message is
+ // added to the queue. Actually, the queue notifies Reactor which
+ // then notifies us.
+ this->msg_queue ()->notification_strategy (&this->strategy_);
+
+ ACE_Get_Opt get_opt (argc, argv, "h:p:");
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ {
+ switch (c)
+ {
+ case 'h':
+ host_ = get_opt.opt_arg ();
+ break;
+ case 'p':
+ port_ = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ }
+ }
+}
+
+Peer_Handler::~Peer_Handler (void)
+{
+}
+
+// This method creates the network connection to the remote peer. It
+// does blocking connects and accepts depending on whether a hostname
+// was specified from the command line.
+
+int
+Peer_Handler::open (void *)
+{
+ if (host_ != 0) // Connector
+ {
+ ACE_INET_Addr addr (port_, host_);
+ ACE_SOCK_Connector connector;
+
+ // Establish connection with server.
+ if (connector.connect (stream_, addr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n"));
+ }
+ else // Acceptor
+ {
+ ACE_SOCK_Acceptor acceptor;
+ ACE_INET_Addr local_addr (port_);
+
+ if ((acceptor.open (local_addr) == -1) ||
+ (acceptor.accept (this->stream_) == -1))
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n"));
+ }
+
+ int result = this->rd_stream_.open (*this);
+ if (result != 0)
+ return result;
+
+ result = this->wr_stream_.open (*this);
+ if (result != 0)
+ return result;
+
+ result = this->rd_stream_.read (this->mb_,
+ this->mb_.size ());
+ return result;
+}
+
+// One of our asynchronous writes to the remote peer has completed.
+// Make sure it succeeded and then delete the message.
+
+void
+Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ if (result.bytes_transferred () <= 0)
+ ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed",
+ result.bytes_transferred ()));
+
+ // This was allocated by the STDIN_Handler, queued, dequeued, passed
+ // to the proactor, and now passed back to us.
+ result.message_block ().release ();
+}
+
+// The remote peer has sent us something. If it succeeded, print
+// out the message and reinitiate a read. Otherwise, fail. In both
+// cases, delete the message sent.
+
+
+void
+Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ if (result.bytes_transferred () > 0 &&
+ this->mb_.length () > 0)
+ {
+ this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0';
+ // Print out the message received from the server.
+ ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ()));
+ }
+ else
+ {
+ // If a read failed, we will assume it's because the remote peer
+ // went away. We will end the event loop. Since we're in the
+ // main thread, we don't need to do a notify.
+ ACE_Reactor::end_event_loop();
+ return;
+ }
+
+ // Reset pointers
+ this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ());
+
+ // Start off another read
+ if (this->rd_stream_.read (this->mb_,
+ this->mb_.size ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler"));
+}
+
+// This is so the Proactor can get our handle.
+ACE_HANDLE
+Peer_Handler::handle (void) const
+{
+ return this->stream_.get_handle ();
+}
+
+// We've been removed from the Reactor.
+int
+Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n"));
+ return 0;
+}
+
+// New stuff added to the message queue. Try to dequeue a message.
+int
+Peer_Handler::handle_output (ACE_HANDLE fd)
+{
+ ACE_Message_Block *mb;
+
+ ACE_Time_Value tv (ACE_Time_Value::zero);
+
+ // Forward the message to the remote peer receiver.
+ if (this->getq (mb, &tv) != -1)
+ {
+ if (this->wr_stream_.write (*mb,
+ mb->length ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1);
+ }
+ return 0;
+}
+
+void
+STDIN_Handler::handler (int signum)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum));
+}
+
+STDIN_Handler::STDIN_Handler (MT_TASK &ph)
+ : ph_ (ph)
+{
+ // Register for ^C from the console. We just need to catch the
+ // exception so that the kernel doesn't kill our process.
+ // Registering this signal handler just tells the kernel that we
+ // know what we're doing; to leave us alone.
+
+ ACE_OS::signal (SIGINT, (ACE_SignalHandler) STDIN_Handler::handler);
+};
+
+// Activate object.
+
+int
+STDIN_Handler::open (void *)
+{
+ if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1);
+
+ return 0;
+}
+
+// Shut down.
+
+int
+STDIN_Handler::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n"));
+ return 0;
+}
+
+// Thread runs here.
+
+int
+STDIN_Handler::svc (void)
+{
+ this->register_thread_exit_hook ();
+
+ for (;;)
+ {
+ ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
+
+ // Read from stdin into mb.
+ int read_result = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
+
+ // If read succeeds, put mb to peer handler, else end the loop.
+ if (read_result > 0)
+ {
+ mb->wr_ptr (read_result);
+ // Note that this call will first enqueue mb onto the peer
+ // handler's message queue, which will then turn around and
+ // notify the Reactor via the Notification_Strategy. This
+ // will subsequently signal the Peer_Handler, which will
+ // react by calling back to its handle_output() method,
+ // which dequeues the message and sends it to the peer
+ // across the network.
+ this->ph_.putq (mb);
+ }
+ else
+ {
+ mb->release ();
+ break;
+ }
+ }
+
+ // handle_signal will get called on the main proactor thread since
+ // we just exited and the main thread is waiting on our thread exit.
+ return 0;
+}
+
+// Register an exit hook with the reactor.
+
+void
+STDIN_Handler::register_thread_exit_hook (void)
+{
+ // Get a real handle to our thread.
+ ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_);
+
+ // Register ourselves to get called back when our thread exits.
+
+ if (ACE_Reactor::instance ()->
+ register_handler (this, this->thr_handle_) == -1)
+ ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n"));
+}
+
+// The STDIN thread has exited. This means the user hit ^C. We can
+// end the event loop and delete ourself.
+
+int
+STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
+{
+ ACE_ASSERT (this->thr_handle_ == si->si_handle_);
+ ACE_Reactor::end_event_loop ();
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ // Let the proactor know that it will be used with Reactor
+ // Create specific proactor
+ ACE_WIN32_Proactor win32_proactor (0, 1);
+ // Get the interface proactor
+ ACE_Proactor proactor (&win32_proactor);
+ // Put it as the instance.
+ ACE_Proactor::instance (&proactor);
+
+ // Open handler for remote peer communications this will run from
+ // the main thread.
+ Peer_Handler peer_handler (argc, argv);
+
+ if (peer_handler.open () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p open failed, errno = %d.\n",
+ "peer_handler", errno), 0);
+
+ // Open active object for reading from stdin.
+ STDIN_Handler stdin_handler (peer_handler);
+
+ // Spawn thread.
+ if (stdin_handler.open () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p open failed, errno = %d.\n",
+ "stdin_handler", errno), 0);
+
+ // Register proactor with Reactor so that we can demultiplex
+ // "waitable" events and I/O operations from a single thread.
+ if (ACE_Reactor::instance ()->register_handler
+ (ACE_Proactor::instance ()->implementation ()) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n",
+ argv[0]), -1);
+
+ // Run main event demultiplexor.
+ ACE_Reactor::run_event_loop ();
+
+ // Remove proactor with Reactor.
+ if (ACE_Reactor::instance ()->remove_handler
+ (ACE_Proactor::instance (), ACE_Event_Handler::DONT_CALL) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n",
+ argv[0]), -1);
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_timeouts.cpp b/examples/Reactor/WFMO_Reactor/test_timeouts.cpp
new file mode 100644
index 00000000000..8fa41f46a66
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_timeouts.cpp
@@ -0,0 +1,80 @@
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_timeouts.cpp
+//
+// = DESCRIPTION
+//
+// This example application shows how to write Reactor event
+// loops that handle events for some fixed amount of time.
+//
+// Run this example (without arguments) to see the timers
+// expire. The order should be:
+//
+// foo, bar, foo, bar, foo, foo, bar, foo, bar, foo
+//
+// = AUTHOR
+// Tim Harrison
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Reactor.h"
+#include "ace/Service_Config.h"
+#include "ace/OS.h"
+
+ACE_RCSID(WFMO_Reactor, test_timeouts, "$Id$")
+
+class Timeout_Handler : public ACE_Event_Handler
+// = TITLE
+// Generic timeout handler.
+{
+public:
+ Timeout_Handler (void)
+ : count_ (0) {}
+
+ virtual int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+ // Print out when timeouts occur.
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "%d timeout occurred for %s.\n",
+ ++count_,
+ (char *) arg));
+ return 0;
+ }
+
+private:
+ int count_;
+};
+
+int
+main (int, char *[])
+{
+ Timeout_Handler handler;
+
+ // Register a 3 second timer.
+ ACE_Time_Value bar_tv (3);
+ ACE_Reactor::instance ()->schedule_timer (&handler,
+ (void *) "Bar",
+ bar_tv,
+ bar_tv);
+
+ // Register a 2 second timer.
+ ACE_Time_Value foo_tv (2);
+ ACE_Reactor::instance ()->schedule_timer (&handler,
+ (void *) "Foo",
+ foo_tv,
+ foo_tv);
+ // Handle events for 12 seconds.
+ ACE_Time_Value run_time (12);
+ if (ACE_Reactor::run_event_loop(run_time) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
+
+ return 0;
+}
diff --git a/examples/Reactor/WFMO_Reactor/test_window_messages.cpp b/examples/Reactor/WFMO_Reactor/test_window_messages.cpp
new file mode 100644
index 00000000000..f46dea7ad19
--- /dev/null
+++ b/examples/Reactor/WFMO_Reactor/test_window_messages.cpp
@@ -0,0 +1,90 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_window_messages.cpp
+//
+// = DESCRIPTION
+//
+// Tests the Msg_WFMO_Reactor's ability to handle regular events
+// and window messages.
+//
+// = AUTHOR
+//
+// Irfan Pyarali <irfan@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/Msg_WFMO_Reactor.h"
+#include "ace/Reactor.h"
+#include "ace/Auto_Ptr.h"
+
+ACE_RCSID(WFMO_Reactor, test_window_messages, "$Id$")
+
+class Event_Handler : public ACE_Event_Handler
+{
+public:
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+
+ ACE_Auto_Event handle_;
+ int iterations_;
+};
+
+int
+Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
+{
+ --this->iterations_;
+
+ if (this->iterations_ == 0)
+ ACE_Reactor::end_event_loop ();
+
+ return 0;
+}
+
+static Event_Handler *global_event_handler;
+
+void WINAPI
+timer_callback (HWND hwnd,
+ UINT uMsg,
+ UINT idEvent,
+ DWORD dwTime)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) timeout occured @ %d\n", dwTime));
+
+ global_event_handler->handle_.signal ();
+}
+
+int
+main (int argc, char** argv)
+{
+ // Manage memory automagically.
+ // Note that ordering here is important.
+ ACE_Reactor_Impl *impl = new ACE_Msg_WFMO_Reactor;
+ auto_ptr<ACE_Reactor> reactor (new ACE_Reactor (impl));
+ ACE_Reactor::instance (reactor.get ());
+ auto_ptr<ACE_Reactor_Impl> delete_impl (impl);
+
+ Event_Handler event_handler;
+ global_event_handler = &event_handler;
+
+ event_handler.iterations_ = 5;
+ int result = ACE_Reactor::instance ()->register_handler (&event_handler,
+ event_handler.handle_.handle ());
+ ACE_ASSERT (result == 0);
+
+ ACE_Time_Value timeout (1);
+ result = ::SetTimer (NULL, // handle of window for timer messages
+ NULL, // timer identifier
+ timeout.msec (), // time-out value
+ (TIMERPROC) &timer_callback // address of timer procedure
+ );
+ ACE_ASSERT (result != 0);
+
+ ACE_Reactor::run_event_loop ();
+
+ return 0;
+}