diff options
Diffstat (limited to 'examples')
17 files changed, 2681 insertions, 0 deletions
diff --git a/examples/Reactor/WFMO_Reactor/Abondoned.dsp b/examples/Reactor/WFMO_Reactor/Abondoned.dsp new file mode 100644 index 00000000000..d113db90ba6 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/Abondoned.dsp @@ -0,0 +1,58 @@ +# Microsoft Developer Studio Project File - Name="Abondoned" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=Abondoned - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "Abondoned.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "Abondoned.mak" CFG="Abondoned - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "Abondoned - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Abondone"
+# PROP BASE Intermediate_Dir "Abondone"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\..\..\\" /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 aced.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept /libpath:"..\..\..\ace"
+# Begin Target
+
+# Name "Abondoned - Win32 Debug"
+# Begin Source File
+
+SOURCE=.\test_abandoned.cpp
+# End Source File
+# End Target
+# End Project
diff --git a/examples/Reactor/WFMO_Reactor/test_abandoned.cpp b/examples/Reactor/WFMO_Reactor/test_abandoned.cpp new file mode 100644 index 00000000000..b3c6c14b86c --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_abandoned.cpp @@ -0,0 +1,121 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_abandoned.cpp +// +// = DESCRIPTION +// +// Tests the WFMO_Reactor's ability to handle abandoned mutexes. +// +// = AUTHOR +// +// Irfan Pyarali <irfan@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/Reactor.h" +#include "ace/Thread_Manager.h" +#include "ace/Process_Mutex.h" + +ACE_RCSID(WFMO_Reactor, test_abandoned, "$Id$") + +class Event_Handler : public ACE_Event_Handler +{ +public: + int handle_signal (int signum, + siginfo_t * = 0, + ucontext_t * = 0); + + int handle_timeout (const ACE_Time_Value &tv, + const void *arg = 0); + + ACE_Auto_Event handle_; + ACE_Process_Mutex *mutex_; + int iterations_; +}; + +static int abandon = 1; + +static void * +worker (void *data) +{ + Event_Handler *handler = (Event_Handler *) data; + + handler->handle_.signal (); + handler->mutex_->acquire (); + + if (!abandon) + handler->mutex_->release (); + + return 0; +} + +int +Event_Handler::handle_signal (int signum, + siginfo_t *s, + ucontext_t *) +{ + HANDLE handle = s->si_handle_; + if (handle == this->handle_.handle ()) + ACE_Reactor::instance ()->register_handler (this, + this->mutex_->lock ().proc_mutex_); + else + { + ACE_Reactor::instance ()->remove_handler (this->mutex_->lock ().proc_mutex_, + ACE_Event_Handler::DONT_CALL); + delete this->mutex_; + } + + return 0; +} + +int +Event_Handler::handle_timeout (const ACE_Time_Value &tv, + const void *arg) +{ + --this->iterations_; + ACE_DEBUG ((LM_DEBUG, + "(%t) timeout occured @ %T, iterations left %d\n", + this->iterations_)); + + if (this->iterations_ == 0) + ACE_Reactor::end_event_loop (); + else + { + ACE_NEW_RETURN (this->mutex_, + ACE_Process_Mutex, + -1); + int result = ACE_Thread_Manager::instance ()->spawn (&worker, this); + ACE_ASSERT (result != -1); + } + + return 0; +} + +int +main (int argc, char *argv[]) +{ + Event_Handler event_handler; + + event_handler.iterations_ = 5; + int result = ACE_Reactor::instance ()->register_handler + (&event_handler, + event_handler.handle_.handle ()); + ACE_ASSERT (result == 0); + + ACE_Time_Value timeout (2); + result = ACE_Reactor::instance ()->schedule_timer (&event_handler, + 0, + timeout, + timeout); + ACE_ASSERT (result != -1); + + ACE_Reactor::run_event_loop (); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_apc.cpp b/examples/Reactor/WFMO_Reactor/test_apc.cpp new file mode 100644 index 00000000000..5a2d79a4de5 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_apc.cpp @@ -0,0 +1,105 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_apc.cpp +// +// = DESCRIPTION +// +// Tests the WFMO_Reactor's ability to handle regular APC +// notifications. +// +// = AUTHOR +// +// Irfan Pyarali <irfan@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/Reactor.h" + +ACE_RCSID(WFMO_Reactor, test_apc, "$Id$") + +class Event_Handler : public ACE_Event_Handler +{ +public: + int handle_signal (int signum, + siginfo_t * = 0, + ucontext_t * = 0); + + int handle_timeout (const ACE_Time_Value &tv, + const void *arg = 0); + + ACE_Auto_Event handle_; + int iterations_; +}; + +static Event_Handler *global_event_handler; + +static void WINAPI +apc_callback (DWORD) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) apc occured @ %T\n")); + + global_event_handler->handle_.signal (); +} + +void +queue_apc (void) +{ + DWORD result = ::QueueUserAPC (&apc_callback, // pointer to APC function + ::GetCurrentThread (), // handle to the thread + 0); // argument for the APC function + if (result == FALSE) + ACE_OS::exit (-1); +} + +int +Event_Handler::handle_signal (int signum, + siginfo_t *, + ucontext_t *) +{ + --this->iterations_; + + if (this->iterations_ == 0) + ACE_Reactor::end_event_loop (); + + return 0; +} + +int +Event_Handler::handle_timeout (const ACE_Time_Value &tv, + const void *arg) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) timeout occured @ %T\n")); + queue_apc (); + return 0; +} + +int +main (int argc, char *argv[]) +{ + Event_Handler event_handler; + event_handler.iterations_ = 5; + global_event_handler = &event_handler; + + int result = ACE_Reactor::instance ()->register_handler (&event_handler, + event_handler.handle_.handle ()); + ACE_ASSERT (result == 0); + + ACE_Time_Value timeout (2); + result = ACE_Reactor::instance ()->schedule_timer (&event_handler, + 0, + timeout, + timeout); + ACE_ASSERT (result != -1); + + ACE_Reactor::run_alertable_event_loop (); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_console_input.cpp b/examples/Reactor/WFMO_Reactor/test_console_input.cpp new file mode 100644 index 00000000000..adc9915b0a2 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_console_input.cpp @@ -0,0 +1,84 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_console_input.cpp +// +// = DESCRIPTION +// +// This application tests the working of WFMO_Reactor when users +// are interested in console input. +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/Reactor.h" + +ACE_RCSID(WFMO_Reactor, test_console_input, "$Id$") + +class Event_Handler : public ACE_Event_Handler +{ +public: + Event_Handler (ACE_Reactor &reactor); + int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); +}; + +Event_Handler::Event_Handler (ACE_Reactor &reactor) +{ + this->reactor (&reactor); + + if (this->reactor ()->register_handler (this, + ACE_STDIN) != 0) + ACE_ERROR ((LM_ERROR, + "Registration with Reactor could not be done\n")); +} + +int +Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + ACE_TCHAR buffer[BUFSIZ]; + int result = ACE_OS::read (ACE_STDIN, buffer, sizeof buffer); + buffer[result] = '\0'; + + if (result <= 0) + { + this->reactor ()->close (); + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::read"), -1); + } + + if (ACE_OS::strcmp (ACE_TEXT("quit\r\n"), buffer) == 0) + this->reactor ()->close (); + + ACE_DEBUG ((LM_DEBUG, "User input: %s", buffer)); + + return 0; +} + +int +Event_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_DEBUG ((LM_DEBUG, "Event_Handler removed from Reactor\n")); + return 0; +} + +int +main (int, char *[]) +{ + ACE_Reactor reactor; + Event_Handler handler (reactor); + + int result = 0; + while (result != -1) + result = reactor.handle_events (); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_directory_changes.cpp b/examples/Reactor/WFMO_Reactor/test_directory_changes.cpp new file mode 100644 index 00000000000..48c486380cb --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_directory_changes.cpp @@ -0,0 +1,115 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_directory_changes.cpp +// +// = DESCRIPTION +// +// This application tests the working of WFMO_Reactor when users +// are interested in monitoring changes in the filesystem. +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" + +ACE_RCSID(WFMO_Reactor, test_directory_changes, "$Id$") + +static int stop_test = 0; +static const ACE_TCHAR *directory = ACE_TEXT ("."); +static const ACE_TCHAR *temp_file = ACE_TEXT ("foo"); + +class Event_Handler : public ACE_Event_Handler +{ +public: + Event_Handler (ACE_Reactor &reactor); + ~Event_Handler (void); + int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + +private: + ACE_HANDLE handle_; +}; + +Event_Handler::Event_Handler (ACE_Reactor &reactor) + : handle_ (ACE_INVALID_HANDLE) +{ + this->reactor (&reactor); + + int change_notification_flags = FILE_NOTIFY_CHANGE_FILE_NAME; + + this->handle_ = ACE_TEXT_FindFirstChangeNotification (directory, // pointer to name of directory to watch + FALSE, // flag for monitoring directory or directory tree + change_notification_flags // filter conditions to watch for + ); + if (this->handle_ == ACE_INVALID_HANDLE) + ACE_ERROR ((LM_ERROR, "FindFirstChangeNotification could not be setup\n")); + + if (this->reactor ()->register_handler (this, + this->handle_) != 0) + ACE_ERROR ((LM_ERROR, "Registration with Reactor could not be done\n")); +} + +Event_Handler::~Event_Handler (void) +{ +} + +int +Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + ::FindNextChangeNotification (this->handle_); + if (stop_test) + this->reactor ()->close (); + return 0; +} + +int +Event_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_DEBUG ((LM_DEBUG, "Event_Handler removed from Reactor\n")); + ::FindCloseChangeNotification (this->handle_); + return 0; +} + +void +worker (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Thread creation\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread creating temporary file\n")); + ACE_HANDLE file = ACE_OS::open (temp_file, _O_CREAT | _O_EXCL); + if (file == ACE_INVALID_HANDLE) + ACE_ERROR ((LM_ERROR, "Error in creating %s: %p\n", temp_file, "ACE_OS::open")); + else + { + ACE_OS::close (file); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n")); + ACE_OS::sleep (3); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread removing temporary file\n")); + stop_test = 1; + ACE_OS::unlink (temp_file); + } +} + +int +main (int, char *[]) +{ + ACE_Reactor reactor; + Event_Handler handler (reactor); + + int result = ACE_OS::thr_create ((ACE_THR_FUNC) worker, 0, 0, 0); + ACE_ASSERT (result == 0); + + for (result = 0; result != -1; result = reactor.handle_events ()) + continue; + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_exceptions.cpp b/examples/Reactor/WFMO_Reactor/test_exceptions.cpp new file mode 100644 index 00000000000..951959f39e1 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_exceptions.cpp @@ -0,0 +1,97 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_exceptions.cpp +// +// = DESCRIPTION +// +// This test application tests the state of WFMO_Reactor when +// exceptions occurs when executing user callbacks. +// +// The thread count in WFMO_Reactor is used to ensure that state of +// WFMO_Reactor is not fouled up when exceptions occur in user code. +// This example also shows how to write event loops that survive +// user exceptions +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/WFMO_Reactor.h" + +ACE_RCSID(WFMO_Reactor, test_exceptions, "$Id$") + +class Event_Handler : public ACE_Event_Handler +{ +public: + Event_Handler (void) + : event_ (1) + { + ACE_DEBUG ((LM_DEBUG, + "Event_Handler created\n")); + } + + ~Event_Handler (void) + { + ACE_DEBUG ((LM_DEBUG, + "Event_Handler destroyed\n")); + } + + int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0) + { + char *cause_exception = 0; + char a = *cause_exception; + return 0; + } + + ACE_HANDLE get_handle (void) const + { + return this->event_.handle (); + } +private: + ACE_Manual_Event event_; +}; + +class ACE_WFMO_Reactor_Test +{ +public: + static void doit (ACE_WFMO_Reactor &wfmo_reactor) + { + for (int i = 1; i <= 10; i++) + { + ACE_DEBUG ((LM_DEBUG, + "Active threads in WFMO_Reactor (before handle_events) = %d\n", + wfmo_reactor.active_threads_)); + ACE_SEH_TRY + { + wfmo_reactor.handle_events (); + } + ACE_SEH_EXCEPT (EXCEPTION_EXECUTE_HANDLER) + { + ACE_DEBUG ((LM_DEBUG, + "Exception occurred\n")); + } + ACE_DEBUG ((LM_DEBUG, + "Active threads in WFMO_Reactor (after handle_events) = %d\n", + wfmo_reactor.active_threads_)); + } + } +}; + +int +main (int, char *[]) +{ + Event_Handler handler; + ACE_WFMO_Reactor wfmo_reactor; + wfmo_reactor.register_handler (&handler); + + ACE_WFMO_Reactor_Test::doit (wfmo_reactor); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_handle_close.cpp b/examples/Reactor/WFMO_Reactor/test_handle_close.cpp new file mode 100644 index 00000000000..5141262c2a8 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_handle_close.cpp @@ -0,0 +1,306 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_handle_close.cpp +// +// = DESCRIPTION +// +// This application tests whether handle_close gets called and if +// the correct masks are passed along. The handler should get +// handle_close called for all three masks (READ, WRITE, and +// EXCEPT). +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Get_Opt.h" +#include "ace/Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/Select_Reactor.h" +#include "ace/Auto_Ptr.h" +#include "ace/Pipe.h" + +ACE_RCSID(WFMO_Reactor, test_handle_close, "$Id$") + +// Use the WFMO_Reactor +static int opt_wfmo_reactor = 0; + +// Use the Select_Reactor +static int opt_select_reactor = 0; + +// Make pipe readable in main() +static int write_to_pipe_in_main = 0; + +// Cancel reads +static int cancel_reads = 0; + +// Write some data to the pipe. This will cause handle_input to get +// called. +void +write_to_pipe (ACE_Pipe &pipe) +{ + char *data = "hello"; + int len = ACE_OS::strlen (data); + + int result = ACE::send (pipe.write_handle (), + data, + len); + ACE_ASSERT (result == len); +} + +// Simple handler class +class Handler : public ACE_Event_Handler +{ +public: + Handler (ACE_Pipe &pipe) + : pipe_ (pipe) + { + } + + ACE_HANDLE get_handle (void) const + { + return this->pipe_.read_handle (); + } + + int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) + { + ACE_DEBUG ((LM_DEBUG, + "Handler::handle_close called with mask = %d\n", + close_mask)); + return 0; + } + + int handle_input (ACE_HANDLE handle) + { + ACE_DEBUG ((LM_DEBUG, "Handler::handle_input\n")); + + // Remove for reading + return -1; + } + + int handle_output (ACE_HANDLE handle) + { + ACE_DEBUG ((LM_DEBUG, "Handler::handle_output\n")); + + // Optionally cancel reads + if (cancel_reads) + { + int result = ACE_Reactor::instance ()->cancel_wakeup (this, + ACE_Event_Handler::READ_MASK); + ACE_ASSERT (result != -1); + } + + // Write to the pipe; this causes handle_input to get called. + if (!write_to_pipe_in_main) + write_to_pipe (this->pipe_); + + // Remove for writing + return -1; + } + +protected: + ACE_Pipe &pipe_; +}; + +class Different_Handler : public ACE_Event_Handler +{ +public: + + Different_Handler (ACE_Pipe &pipe) + : pipe_ (pipe) + { + } + + ACE_HANDLE get_handle (void) const + { + return this->pipe_.read_handle (); + } + + int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) + { + ACE_DEBUG ((LM_DEBUG, + "Different_Handler::handle_close called with mask = %d\n", + close_mask)); + return 0; + } + + int handle_input (ACE_HANDLE handle) + { + ACE_DEBUG ((LM_DEBUG, "Different_Handler::handle_input\n")); + + // Remove for reading + int result = ACE_Reactor::instance ()->remove_handler (this, + ACE_Event_Handler::READ_MASK); + ACE_ASSERT (result == 0); + + return 0; + } + + int handle_output (ACE_HANDLE handle) + { + ACE_DEBUG ((LM_DEBUG, "Different_Handler::handle_output\n")); + + // Add for reading + int result = ACE_Reactor::instance ()->mask_ops (this, + ACE_Event_Handler::READ_MASK, + ACE_Reactor::ADD_MASK); + ACE_ASSERT (result != -1); + + ACE_Reactor_Mask old_masks = + ACE_Event_Handler::WRITE_MASK | + ACE_Event_Handler::EXCEPT_MASK; + + ACE_ASSERT (old_masks == + ACE_static_cast (ACE_Reactor_Mask, result)); + + // Get new masks + result = ACE_Reactor::instance ()->mask_ops (this, + ACE_Event_Handler::NULL_MASK, + ACE_Reactor::GET_MASK); + ACE_ASSERT (result != -1); + + ACE_Reactor_Mask current_masks = + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::WRITE_MASK | + ACE_Event_Handler::EXCEPT_MASK; + + ACE_ASSERT (current_masks == + ACE_static_cast (ACE_Reactor_Mask, result)); + + // Remove for writing + ACE_Reactor_Mask mask = ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::DONT_CALL; + result = ACE_Reactor::instance ()->remove_handler (this, + mask); + ACE_ASSERT (result == 0); + + // Write to the pipe; this causes handle_input to get called. + if (!write_to_pipe_in_main) + write_to_pipe (this->pipe_); + + return 0; + } + +protected: + ACE_Pipe &pipe_; +}; + + +// +// Selection of which reactor should get created +// +void +create_reactor (void) +{ + ACE_Reactor_Impl *impl = 0; + + if (opt_wfmo_reactor) + { +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) + ACE_NEW (impl, + ACE_WFMO_Reactor); +#endif /* ACE_WIN32 */ + } + else if (opt_select_reactor) + ACE_NEW (impl, + ACE_Select_Reactor); + + ACE_Reactor *reactor = 0; + ACE_NEW (reactor, + ACE_Reactor (impl)); + ACE_Reactor::instance (reactor); +} + +int +main (int argc, char *argv[]) +{ + int result = 0; + + // Parse args + ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("swmc"), 1); + for (int c; (c = getopt ()) != -1; ) + switch (c) + { + case 's': + opt_select_reactor = 1; + break; + case 'w': + opt_wfmo_reactor = 1; + break; + case 'm': + write_to_pipe_in_main = 1; + break; + case 'c': + cancel_reads = 1; + break; + } + + // Create pipes + ACE_Pipe pipe1, pipe2; + + result = pipe1.open (); + ACE_ASSERT (result == 0); + + result = pipe2.open (); + ACE_ASSERT (result == 0); + + // Create handlers + Handler handler (pipe1); + Different_Handler different_handler (pipe2); + + // Create reactor + create_reactor (); + + // Manage memory automagically. + auto_ptr<ACE_Reactor> reactor (ACE_Reactor::instance ()); + auto_ptr<ACE_Reactor_Impl> impl; + + // If we are using other that the default implementation, we must + // clean up. + if (opt_select_reactor || opt_wfmo_reactor) + impl = auto_ptr<ACE_Reactor_Impl> (ACE_Reactor::instance ()->implementation ()); + + // Register handlers + ACE_Reactor_Mask handler_mask = + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::WRITE_MASK | + ACE_Event_Handler::EXCEPT_MASK; + + ACE_Reactor_Mask different_handler_mask = + ACE_Event_Handler::WRITE_MASK | + ACE_Event_Handler::EXCEPT_MASK; + + result = ACE_Reactor::instance ()->register_handler (&handler, + handler_mask); + ACE_ASSERT (result == 0); + + result = ACE_Reactor::instance ()->register_handler (&different_handler, + different_handler_mask); + ACE_ASSERT (result == 0); + + // Write to the pipe; this causes handle_input to get called. + if (write_to_pipe_in_main) + { + write_to_pipe (pipe1); + write_to_pipe (pipe2); + } + + // Note that handle_output will get called automatically since the + // pipe is writable! + + // Run for three seconds + ACE_Time_Value time (3); + ACE_Reactor::instance ()->run_event_loop (time); + + ACE_DEBUG ((LM_DEBUG, "\nClosing down the application\n\n")); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_multithreading.cpp b/examples/Reactor/WFMO_Reactor/test_multithreading.cpp new file mode 100644 index 00000000000..ee368d18c11 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_multithreading.cpp @@ -0,0 +1,245 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_multithreading.cpp +// +// = DESCRIPTION +// +// This application tests multiple threads simultaneously calling +// Reactor::handle_events(). It also shows how different threads +// can update the state of Reactor by registering and removing +// Event_Handlers. +// +// Note that this test will only work with WFMO_Reactor +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Task.h" +#include "ace/Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/Get_Opt.h" + +ACE_RCSID(WFMO_Reactor, test_multithreading, "$Id$") + +static int concurrent_threads = 1; +static int number_of_handles = ACE_Reactor::instance ()->size (); +static int number_of_handles_to_signal = 1; +static int interval = 2; +static int iterations = 10; + +// Explain usage and exit. +static void +print_usage_and_die (void) +{ + ACE_DEBUG ((LM_DEBUG, + "usage: \n\t" + "[-t (# of threads - default 1)] \n\t" + "[-h (# of handlers) - default 62] \n\t" + "[-i (# time interval between signals) - default 2] \n\t" + "[-s (# of handles to signal) - default 1] \n\t" + "[-e (# of iterations) - default 10] \n\t")); + ACE_OS::exit (1); +} + +// Parse the command-line arguments and set options. +static void +parse_args (int argc, char **argv) +{ + ACE_Get_Opt get_opt (argc, argv, "t:h:s:i:e:"); + int c; + + while ((c = get_opt ()) != -1) + switch (c) + { + case 't': + concurrent_threads = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'e': + iterations = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'h': + number_of_handles = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'i': + interval = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 's': + number_of_handles_to_signal = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + print_usage_and_die (); + break; + } +} + +class Task_Handler : public ACE_Task<ACE_NULL_SYNCH> +{ +public: + Task_Handler (size_t number_of_handles, + size_t concurrent_threads); + // Constructor. + + ~Task_Handler (void); + // Destructor. + + virtual int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + // Called when object is removed from the ACE_Reactor + + int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + // Handle events being signaled by the main thread. + + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg = 0); + // Called when timer expires. + + int svc (void); + // Task event loop. + + int signal (size_t index); + // Signal an event. + +private: + ACE_Auto_Event *events_; +}; + +// All threads do reactor->handle_events () +int +Task_Handler::svc (void) +{ + // Try to become the owner + ACE_Reactor::instance ()->owner (ACE_Thread::self ()); + // Run the event loop. + return ACE_Reactor::run_event_loop (); +} + +Task_Handler::Task_Handler (size_t number_of_handles, + size_t concurrent_threads) +{ + ACE_NEW (this->events_, ACE_Auto_Event [number_of_handles]); + + for (size_t i = 1; i <= number_of_handles; i++) + if (ACE_Reactor::instance ()->register_handler (this, + this->events_[i].handle ()) == -1) + ACE_ERROR ((LM_ERROR, + "%p\t cannot register handle %d with Reactor\n", + "Task_Handler::Task_Handler", + i)); + + // Make us an active object. + if (this->activate (THR_NEW_LWP, + concurrent_threads) == -1) + ACE_ERROR ((LM_ERROR, "%p\t cannot activate task\n", + "activate")); +} + +Task_Handler::~Task_Handler (void) +{ + delete [] this->events_; +} + + +int +Task_Handler::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *) +{ + // When signaled, print message, remove self, and add self + // This will force Reactor to update its internal handle tables + + if (ACE_Reactor::instance ()->remove_handler (siginfo->si_handle_, + ACE_Event_Handler::DONT_CALL) == -1) + return -1; + // ACE_ERROR_RETURN ((LM_ERROR, + // "(%t) %p\tTask cannot be unregistered from Reactor: handle value = %d\n", + // "Task_Handler::handle_signal", + // siginfo->si_handle_), -1); + + if (ACE_Reactor::instance ()->register_handler (this, + siginfo->si_handle_) == -1) + return -1; + // ACE_ERROR_RETURN ((LM_ERROR, + // "(%t) %p\tTask cannot be registered with Reactor: handle value = %d\n", + // "Task_Handler::handle_signal", + // siginfo->si_handle_), -1); + return 0; +} + +int +Task_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) handle_close() called: handle value = %d\n", + handle)); + return 0; +} + +int +Task_Handler::handle_timeout (const ACE_Time_Value &tv, + const void *arg) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) handle_timeout() called: iteration value = %d\n", + int (arg))); + return 0; +} + +int +Task_Handler::signal (size_t index) +{ + return this->events_[index].signal (); +} + +int +main (int argc, char **argv) +{ + parse_args (argc, argv); + Task_Handler task (number_of_handles, + concurrent_threads); + + ACE_OS::srand (ACE_OS::time (0L)); + + for (int i = 1; i <= iterations; i++) + { + // Sleep for a while + ACE_OS::sleep (interval); + + // Randomly generate events + ACE_DEBUG ((LM_DEBUG, "********************************************************\n")); + ACE_DEBUG ((LM_DEBUG, "(%t -- main thread) signaling %d events : iteration = %d\n", + number_of_handles_to_signal, + i)); + ACE_DEBUG ((LM_DEBUG, "********************************************************\n")); + + // Setup a timer for the task + if (ACE_Reactor::instance ()->schedule_timer (&task, + (void *) i, + 0) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1); + + for (int i = 0; i < number_of_handles_to_signal; i++) + // Randomly select a handle to signal. + task.signal (ACE_OS::rand() % number_of_handles); + } + + // Sleep for a while + ACE_OS::sleep (interval); + + // End the Reactor event loop + ACE_Reactor::end_event_loop (); + + // Wait for all threads to exit + ACE_Thread_Manager::instance ()->wait (); + + // Close the Reactor singleton before exiting this function. + // If we wait for the Object Manager to do this, it will be too + // late since Task_Handler instance would have disappeared. + ACE_Reactor::close_singleton (); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_network_events.cpp b/examples/Reactor/WFMO_Reactor/test_network_events.cpp new file mode 100644 index 00000000000..3f3c7fdcd20 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_network_events.cpp @@ -0,0 +1,203 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_network_events.cpp +// +// = DESCRIPTION +// +// This application tests Reactor to make sure that it responds +// correctly to different kinds of network events. +// +// The test starts off by creating a Network_Listener, that listens +// for connections at ACE_DEFAULT_SERVER_PORT. When a client +// connects, a Network_Handler is created. Network_Handler reads +// messages off the socket and prints them out. This is done until +// the remote side shuts down. Multiple clients can connect at the +// same time. +// +// Events tested in this example includes ACCEPT, READ, and CLOSE masks. +// +// To run this example, start an instance of this example and +// connect to it using telnet (to port +// ACE_DEFAULT_SERVER_PORT(10002)). +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Stream.h" +#include "ace/SOCK_Acceptor.h" + +ACE_RCSID(WFMO_Reactor, test_network_events, "$Id$") + +class Network_Handler : public ACE_Event_Handler +{ +public: + Network_Handler (ACE_SOCK_Stream &s); + // Default constructor + + virtual int handle_input (ACE_HANDLE handle); + virtual int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + virtual ACE_HANDLE get_handle (void) const; + + ACE_SOCK_Stream stream_; + +}; + +Network_Handler::Network_Handler (ACE_SOCK_Stream &s) + : stream_ (s) +{ + this->reactor (ACE_Reactor::instance ()); + + int result = this->reactor ()->register_handler (this, READ_MASK); + ACE_ASSERT (result == 0); +} + +ACE_HANDLE +Network_Handler::get_handle (void) const +{ + return this->stream_.get_handle (); +} + +int +Network_Handler::handle_input (ACE_HANDLE handle) +{ + ACE_DEBUG ((LM_DEBUG, "Network_Handler::handle_input handle = %d\n", handle)); + + while (1) + { + char message[BUFSIZ]; + int result = this->stream_.recv (message, sizeof message); + if (result > 0) + { + message[result] = 0; + ACE_DEBUG ((LM_DEBUG, "Remote message: %s\n", message)); + } + else if (result == 0) + { + ACE_DEBUG ((LM_DEBUG, "Connection closed\n")); + return -1; + } + else if (errno == EWOULDBLOCK) + { + return 0; + } + else + { + ACE_DEBUG ((LM_DEBUG, "Problems in receiving data, result = %d", result)); + return -1; + } + } +} + +int +Network_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_DEBUG ((LM_DEBUG, "Network_Handler::handle_close handle = %d\n", handle)); + + this->stream_.close (); + delete this; + return 0; +} + +class Network_Listener : public ACE_Event_Handler +{ +public: + Network_Listener (void); + // Default constructor + ~Network_Listener (void); + // Default constructor + + virtual int handle_input (ACE_HANDLE handle); + virtual int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + ACE_HANDLE get_handle (void) const; + + ACE_INET_Addr local_address_; + ACE_SOCK_Acceptor acceptor_; +}; + +Network_Listener::Network_Listener (void) + : local_address_ (ACE_DEFAULT_SERVER_PORT), + acceptor_ (local_address_, 1) +{ + this->reactor (ACE_Reactor::instance ()); + int result = this->reactor ()->register_handler (this, + ACE_Event_Handler::ACCEPT_MASK); + ACE_ASSERT (result == 0); +} + +Network_Listener::~Network_Listener (void) +{ + this->reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK || + ACE_Event_Handler::DONT_CALL); + this->handle_close (this->get_handle (), ACE_Event_Handler::ALL_EVENTS_MASK); +} + +ACE_HANDLE +Network_Listener::get_handle (void) const +{ + return this->acceptor_.get_handle (); +} + +int +Network_Listener::handle_input (ACE_HANDLE handle) +{ + ACE_DEBUG ((LM_DEBUG, "Network_Listener::handle_input handle = %d\n", handle)); + + ACE_INET_Addr remote_address; + ACE_SOCK_Stream stream; + + // Try to find out if the implementation of the reactor that we are + // using requires us to reset the event association for the newly + // created handle. This is because the newly created handle will + // inherit the properties of the listen handle, including its event + // associations. + int reset_new_handle = this->reactor ()->uses_event_associations (); + + int result = this->acceptor_.accept (stream, // stream + &remote_address, // remote address + 0, // timeout + 1, // restart + reset_new_handle); // reset new handler + ACE_ASSERT (result == 0); + + ACE_DEBUG ((LM_DEBUG, "Remote connection from: ")); + remote_address.dump (); + + Network_Handler *handler; + ACE_NEW_RETURN (handler, Network_Handler (stream), -1); + + return 0; +} + +int +Network_Listener::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_DEBUG ((LM_DEBUG, "Network_Listener::handle_close handle = %d\n", handle)); + + this->acceptor_.close (); + return 0; +} + +int +main (int, char *[]) +{ + Network_Listener listener; + + ACE_Reactor::run_event_loop (); + + return 0; +}; diff --git a/examples/Reactor/WFMO_Reactor/test_prerun_state_changes.cpp b/examples/Reactor/WFMO_Reactor/test_prerun_state_changes.cpp new file mode 100644 index 00000000000..fb880b8c8ee --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_prerun_state_changes.cpp @@ -0,0 +1,64 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_prerun_state_changes.cpp +// +// = DESCRIPTION +// +// Tests the Reactor's ability to handle state changes before +// getting a chance to run. +// +// = AUTHOR +// +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" + +ACE_RCSID(WFMO_Reactor, test_prerun_state_changes, "$Id$") + +class Event_Handler : public ACE_Event_Handler +// = TITLE +// Generic Event Handler. +// +{ +public: + virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask mask) + { + ACE_DEBUG ((LM_DEBUG, + "event handler %d closed.\n", + (int) handle)); + delete this; + return 0; + } +}; + +int +main (int argc, char *argv[]) +{ + ACE_HANDLE handle = (ACE_HANDLE) ::socket (PF_INET, SOCK_STREAM, 0); + + Event_Handler *event_handler = new Event_Handler; + + int result = ACE_Reactor::instance ()->register_handler (handle, + event_handler, + ACE_Event_Handler::READ_MASK); + ACE_ASSERT (result == 0); + + result = ACE_Reactor::instance ()->register_handler (handle, + event_handler, + ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::QOS_MASK); + ACE_ASSERT (result == 0); + + result = ACE_Reactor::instance ()->remove_handler (handle, + ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); + ACE_ASSERT (result == 0); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_registration.cpp b/examples/Reactor/WFMO_Reactor/test_registration.cpp new file mode 100644 index 00000000000..6409efc2749 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_registration.cpp @@ -0,0 +1,155 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_registration.cpp +// +// = DESCRIPTION +// +// This test application tests a wide range of registration, +// suspension, resumption, and removal of events from Reactor. +// +// The application initially registers two events with Reactor. A +// auxiliary thread is created to do the signaling on the +// events. When the first event is signaled, the event is suspended +// from Reactor. The event is then signaled again, but is "lost" +// since the handler has been suspended. When the second event is +// signal, the first event is resumed and the second is +// suspended. When the first event is signaled again, both events +// are removed from Reactor. +// +// This test shows off the following features of Reactor: +// - Registration +// - Suspension +// - Resumption +// - Removal (while active and while suspended) +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" + +ACE_RCSID(WFMO_Reactor, test_registration, "$Id$") + +// Globals for this test +int stop_test = 0; +ACE_Reactor reactor; + + +class Simple_Handler : public ACE_Event_Handler +{ +public: + Simple_Handler (void); + // Default constructor + + virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + virtual int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + + ACE_Auto_Event event1_; + ACE_Auto_Event event2_; + int handle_signal_count_; + int handle_close_count_; +}; + +Simple_Handler::Simple_Handler (void) + : handle_signal_count_ (0), + handle_close_count_ (0) +{ +} + +int +Simple_Handler::handle_signal (int signum, siginfo_t *s, ucontext_t *) +{ + ACE_HANDLE handle = s->si_handle_; + + this->handle_signal_count_++; + + if (this->handle_signal_count_ == 1) + this->reactor ()->suspend_handler (event1_.handle ()); + else if (this->handle_signal_count_ == 2) + { + this->reactor ()->resume_handler (event1_.handle ()); + this->reactor ()->suspend_handler (event2_.handle ()); + } + else if (this->handle_signal_count_ == 3) + { + this->reactor ()->remove_handler (event1_.handle (), + ACE_Event_Handler::NULL_MASK); + this->reactor ()->remove_handler (event2_.handle (), + ACE_Event_Handler::NULL_MASK); + } + return 0; +} + +int +Simple_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_DEBUG ((LM_DEBUG, "Simple_Handler::handle_close handle = %d\n", handle)); + this->handle_close_count_++; + + if (this->handle_close_count_ == 1) + stop_test = 0; + else if (this->handle_close_count_ == 2) + stop_test = 1; + + return 0; +} + +// Globals for this test +Simple_Handler simple_handler; + +void +worker (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Thread creation\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n")); + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event1_.handle())); + simple_handler.event1_.signal (); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n")); + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event1_.handle())); + ACE_DEBUG ((LM_DEBUG, "Note: This signal should be \"lost\" because of the suspended handler\n")); + simple_handler.event1_.signal (); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n")); + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread resetting %d\n", simple_handler.event1_.handle())); + simple_handler.event1_.reset (); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event2_.handle())); + simple_handler.event2_.signal (); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n")); + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread signaling %d\n", simple_handler.event1_.handle())); + simple_handler.event1_.signal (); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread death\n")); +} + +int +main (int, char *[]) +{ + int result = reactor.register_handler (&simple_handler, + simple_handler.event1_.handle ()); + ACE_ASSERT (result == 0); + + result = reactor.register_handler (&simple_handler, + simple_handler.event2_.handle ()); + ACE_ASSERT (result == 0); + + result = ACE_OS::thr_create ((ACE_THR_FUNC) worker, 0, 0, 0); + ACE_ASSERT (result == 0); + + result = 0; + while (!stop_test && result != -1) + { + result = reactor.handle_events (); + } + return 0; +}; diff --git a/examples/Reactor/WFMO_Reactor/test_registry_changes.cpp b/examples/Reactor/WFMO_Reactor/test_registry_changes.cpp new file mode 100644 index 00000000000..1dd25680a1f --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_registry_changes.cpp @@ -0,0 +1,133 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_registry_changes.cpp +// +// = DESCRIPTION +// +// This application tests the working of Reactor when users are +// interested in monitoring changes in the registry. +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" +#include "ace/Registry.h" + +ACE_RCSID(WFMO_Reactor, test_registry_changes, "$Id$") + +static int stop_test = 0; +static HKEY context_to_monitor = HKEY_CURRENT_USER; +static const ACE_TCHAR *temp_context_name = ACE_TEXT ("ACE temporary context"); + +class Event_Handler : public ACE_Event_Handler +{ +public: + Event_Handler (ACE_Reactor &reactor); + ~Event_Handler (void); + int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + ACE_Registry::Naming_Context &context (void); + +private: + ACE_Auto_Event event_; + ACE_Registry::Naming_Context context_; +}; + +Event_Handler::Event_Handler (ACE_Reactor &reactor) + : context_ (context_to_monitor) +{ + this->reactor (&reactor); + + if (::RegNotifyChangeKeyValue (this->context_.key (), // handle of key to watch + FALSE, // flag for subkey notification + REG_NOTIFY_CHANGE_NAME, // changes to be reported + this->event_.handle (), // handle of signaled event + TRUE // flag for asynchronous reporting + ) != ERROR_SUCCESS) + ACE_ERROR ((LM_ERROR, "RegNotifyChangeKeyValue could not be setup\n")); + + if (this->reactor ()->register_handler (this, + this->event_.handle ()) != 0) + ACE_ERROR ((LM_ERROR, "Registration with Reactor could not be done\n")); +} + +Event_Handler::~Event_Handler (void) +{ +} + +int +Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + if (stop_test) + this->reactor ()->close (); + else if (::RegNotifyChangeKeyValue (this->context_.key (), // handle of key to watch + FALSE, // flag for subkey notification + REG_NOTIFY_CHANGE_NAME, // changes to be reported + this->event_.handle (), // handle of signaled event + TRUE // flag for asynchronous reporting + ) != ERROR_SUCCESS) + ACE_ERROR ((LM_ERROR, + "RegNotifyChangeKeyValue could not be setup\n")); + return 0; +} + +int +Event_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_DEBUG ((LM_DEBUG, "Event_Handler removed from Reactor\n")); + return 0; +} + +ACE_Registry::Naming_Context & +Event_Handler::context (void) +{ + return this->context_; +} + +void +worker (Event_Handler *event_handler) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Thread creation\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) Thread creating temporary registry entry\n")); + + ACE_Registry::Naming_Context temp_context; + int result = event_handler->context ().bind_new_context (temp_context_name, + temp_context); + + if (result == -1) + ACE_ERROR ((LM_ERROR, "Error in creating %s: %p\n", temp_context_name, "bind_new_context")); + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) Thread sleeping\n")); + ACE_OS::sleep (3); + + ACE_DEBUG ((LM_DEBUG, "(%t) Thread removing registry entry\n")); + stop_test = 1; + event_handler->context ().unbind_context (temp_context_name); + } +} + +int +main (int, char *[]) +{ + ACE_Reactor reactor; + Event_Handler handler (reactor); + + int result = ACE_OS::thr_create ((ACE_THR_FUNC) worker, &handler, 0, 0); + ACE_ASSERT (result == 0); + + for (result = 0; result != -1; result = reactor.handle_events ()) + continue; + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_removals.cpp b/examples/Reactor/WFMO_Reactor/test_removals.cpp new file mode 100644 index 00000000000..1c4b4acf687 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_removals.cpp @@ -0,0 +1,103 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_removals.cpp +// +// = DESCRIPTION +// +// Tests the Reactor's ability to handle simultaneous events. If +// you pass anything on the command-line, then each handler +// requests to be removed from the Reactor after each event. +// +// = AUTHOR +// Tim Harrison +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" +#include "ace/Service_Config.h" +#include "ace/Synch.h" + +ACE_RCSID(WFMO_Reactor, test_removals, "$Id$") + +class Event_Handler : public ACE_Event_Handler +// = TITLE +// Generic Event Handler. +// +// = DESCRIPTION +// +// Creates event. Registers with Reactor. Signals event. If +// created with -close_down- it returns -1 from handle signal. +{ +public: + Event_Handler (int event_number, + int close_down) + : event_number_ (event_number), + close_down_ (close_down) + { + if (ACE_Reactor::instance ()->register_handler (this, + this->event_.handle ()) == -1) + ACE_ERROR ((LM_ERROR, "%p\tevent handler %d cannot be added to Reactor\n", "", event_number_)); + this->event_.signal (); + } + + virtual int handle_signal (int index, siginfo_t *, ucontext_t *) + { + if (this->close_down_) + return -1; + else + return 0; + } + + virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask) + { + ACE_DEBUG ((LM_DEBUG, "event handler %d closed.\n", event_number_)); + delete this; + return 0; + } + + virtual ACE_HANDLE get_handle (void) const + { + return event_.handle (); + } + +private: + int event_number_; + // Our event number. + + int close_down_; + // Shall we close down or not. + + ACE_Event event_; + // Signaled to shut down the handler. +}; + +int +main (int argc, char *argv[]) +{ + int close_down = argc > 1 ? 1 : 0; + + for (size_t i = 1; i <= ACE_Reactor::instance ()->size (); i++) + new Event_Handler (i, close_down); + + int result = 0; + ACE_Time_Value time (1); + while (1) + { + result = ACE_Reactor::instance ()->handle_events (time); + if (result == 0 && errno == ETIME) + { + ACE_DEBUG ((LM_DEBUG, "No more work left: timing out\n")); + break; + } + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); + } + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_suspended_removals.cpp b/examples/Reactor/WFMO_Reactor/test_suspended_removals.cpp new file mode 100644 index 00000000000..4dc7cc33bb7 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_suspended_removals.cpp @@ -0,0 +1,163 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_suspended_removals.cpp +// +// = DESCRIPTION +// +// Tests the Reactor's ability to handle removal of suspended +// handles. +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/Synch.h" + +ACE_RCSID(WFMO_Reactor, test_suspended_removals, "$Id$") + +class Event_Handler : public ACE_Event_Handler +{ +public: + + ACE_HANDLE get_handle (void) const + { + return this->event_.handle (); + } + + ACE_Event event_; +}; + +class ACE_WFMO_Reactor_Test +{ +public: + static void check_for_valid_state (ACE_WFMO_Reactor &wfmo_reactor, + size_t handles_to_be_added, + size_t handles_to_be_suspended, + size_t handles_to_be_resumed, + size_t handles_to_be_deleted) + { + ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_added_ == handles_to_be_added); + ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_suspended_ == handles_to_be_suspended); + ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_resumed_ == handles_to_be_resumed); + ACE_ASSERT (wfmo_reactor.handler_rep_.handles_to_be_deleted_ == handles_to_be_deleted); + } +}; + +int +main (int argc, char *argv[]) +{ + Event_Handler handler; + ACE_WFMO_Reactor reactor; + ACE_Reactor base_reactor (&reactor); + ACE_Time_Value time (1); + + int result = + reactor.register_handler (&handler); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 1, 0, 0, 0); + + result = + reactor.remove_handler (&handler, + ACE_Event_Handler::DONT_CALL | + ACE_Event_Handler::ALL_EVENTS_MASK); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 1, 0, 0, 1); + + result = base_reactor.run_reactor_event_loop (time); + ACE_ASSERT (result != -1); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 0, 0); + + result = + reactor.register_handler (&handler); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 1, 0, 0, 0); + + result = base_reactor.run_reactor_event_loop (time); + ACE_ASSERT (result != -1); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 0, 0); + + result = + reactor.suspend_handler (&handler); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 1, 0, 0); + + result = + reactor.remove_handler (&handler, + ACE_Event_Handler::DONT_CALL | + ACE_Event_Handler::ALL_EVENTS_MASK); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 0, 1); + + result = base_reactor.run_reactor_event_loop (time); + ACE_ASSERT (result != -1); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 0, 0); + + result = + reactor.register_handler (&handler); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 1, 0, 0, 0); + + result = + reactor.suspend_handler (&handler); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 1, 1, 0, 0); + + result = base_reactor.run_reactor_event_loop (time); + ACE_ASSERT (result != -1); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 0, 0); + + result = + reactor.resume_handler (&handler); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 1, 0); + + result = + reactor.remove_handler (&handler, + ACE_Event_Handler::DONT_CALL | + ACE_Event_Handler::ALL_EVENTS_MASK); + ACE_ASSERT (result == 0); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 0, 1); + + result = base_reactor.run_reactor_event_loop (time); + ACE_ASSERT (result != -1); + + ACE_WFMO_Reactor_Test::check_for_valid_state (reactor, + 0, 0, 0, 0); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_talker.cpp b/examples/Reactor/WFMO_Reactor/test_talker.cpp new file mode 100644 index 00000000000..84ccb8a78b6 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_talker.cpp @@ -0,0 +1,559 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_talker.cpp +// +// = DESCRIPTION +// +// This test application tests a wide range of events that can be +// demultiplexed using various ACE utilities. Events used include +// ^C events, reading from STDIN, vanilla Win32 events, thread +// exits, Reactor notifications, proactive reads, and proactive +// writes. +// +// The proactive I/O events are demultiplexed by the ACE_Proactor. +// The thread exits, notications, and vanilla Win32 events are +// demultiplexed by the ACE_Reactor. To enable a single thread +// to run all these events, the Proactor is integrated with the +// Reactor. +// +// The test application prototypes a simple talk program. Two +// instances of the application connect. Input from either console +// is displayed on the others console also. Because of the evils +// of Win32 STDIN, a separate thread is used to read from STDIN. +// To test the Proactor and Reactor, I/O between the remote +// processes is performed proactively and interactions between the +// STDIN thread and the main thread are performed reactively. +// +// The following description of the test application is in two +// parts. The participants section explains the main components +// involved in the application. The collaboration section +// describes how the partipants interact in response to the +// multiple event types which occur. +// +// The Reactor test application has the following participants: +// +// . Reactor -- The Reactor demultiplexes Win32 "waitable" +// events using WaitForMultipleObjects. +// +// . Proactor -- The proactor initiates and demultiplexes +// overlapped I/O operations. The Proactor registers with the +// Reactor so that a single-thread can demultiplex all +// application events. +// +// . STDIN_Handler -- STDIN_Handler is an Active Object which reads +// from STDIN and forwards the input to the Peer_Handler. This +// runs in a separate thread to make the test more interesting. +// However, STDIN is "waitable", so in general it can be waited on +// by the ACE Reactor, thanks MicroSlush! +// +// . Peer_Handler -- The Peer_Handler connects to another instance +// of test_reactor. It Proactively reads and writes data to the +// peer. When the STDIN_Handler gives it messages, it fowards them +// to the remote peer. When it receives messages from the remote +// peer, it prints the output to the console. +// +// The collaborations of the participants are as follows: +// +// . Initialization +// +// Peer_Handler -- connects to the remote peer. It then begins +// proactively reading from the remote connection. Note that it +// will be notified by the Proactor when a read completes. It +// also registers a notification strategy with message queue so +// that it is notified when the STDIN_Handler posts a message +// onto the queue. +// +// STDIN_Handler -- STDIN_Handler registers a signal handler for +// SIGINT. This just captures the exception so that the kernel +// doesn't kill our process; We want to exit gracefully. It also +// creates an Exit_Hook object which registers the +// STDIN_Handler's thread handle with the Reactor. The +// Exit_Hook will get called back when the STDIN_Handler thread +// exits. After registering these, it blocks reading from STDIN. +// +// Proactor -- is registered with the Reactor. +// +// The main thread of control waits in the Reactor. +// +// . STDIN events -- When the STDIN_Handler thread reads from +// STDIN, it puts the message on Peer_Handler's message queue. It +// then returns to reading from STDIN. +// +// . Message enqueue -- The Reactor thread wakes up and calls +// Peer_Handler::handle_output. The Peer_Handler then tries to +// dequeue a message from its message queue. If it can, the +// message is Proactively sent to the remote peer. Note that the +// Peer_Handler will be notified with this operation is complete. +// The Peer_Handler then falls back into the Reactor event loop. +// +// . Send complete event -- When a proactive send is complete, the +// Proactor is notified by the Reactor. The Proactor, in turn, +// notifies the Peer_Handler. The Peer_Handler then checks for +// more messages from the message queue. If there are any, it +// tries to send them. If there are not, it returns to the +// Reactor event loop. +// +// . Read complete event -- When a proactive read is complete (the +// Peer_Handler initiated a proactive read when it connected to the +// remote peer), the Proactor is notified by the Reactor. The +// Proactor, in turn notifies the Peer_Handler. If the read was +// successful the Peer_Handler just displays the received msg to +// the console and reinvokes a proactive read from the network +// connection. If the read failed (i.e. the remote peer exited), +// the Peer_Handler sets a flag to end the event loop and returns. +// This will cause the application to exit. +// +// . ^C events -- When the user types ^C at the console, the +// STDIN_Handler's signal handler will be called. It does nothing, +// but as a result of the signal, the STDIN_Handler thread will +// exit. +// +// . STDIN_Handler thread exits -- The Exit_Hook will get called +// back from the Reactor. Exit_Hook::handle_signal sets a flag +// to end the event loop and returns. This will cause the +// application to exit. +// +// +// To run example, start an instance of the test with an optional +// local port argument (as the acceptor). Start the other instance +// with -h <hostname> and -p <server port>. Type in either the +// client or server windows and your message should show up in the +// other window. Control C to exit. +// +// = AUTHOR +// Tim Harrison +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" +#include "ace/Reactor_Notification_Strategy.h" +#include "ace/WIN32_Proactor.h" +#include "ace/Proactor.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Get_Opt.h" +#include "ace/Service_Config.h" +#include "ace/Synch.h" +#include "ace/Task.h" + +ACE_RCSID(WFMO_Reactor, test_talker, "$Id$") + +typedef ACE_Task<ACE_MT_SYNCH> MT_TASK; + +class Peer_Handler : public MT_TASK, public ACE_Handler +// = TITLE +// Connect to a server. Receive messages from STDIN_Handler +// and forward them to the server using proactive I/O. +{ +public: + // = Initialization methods. + Peer_Handler (int argc, char *argv[]); + ~Peer_Handler (void); + + int open (void * =0); + // This method creates the network connection to the remote peer. + // It does blocking connects and accepts depending on whether a + // hostname was specified from the command line. + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This method will be called when an asynchronous read completes on a stream. + // The remote peer has sent us something. If it succeeded, print + // out the message and reinitiate a read. Otherwise, fail. In both + // cases, delete the message sent. + + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This method will be called when an asynchronous write completes on a strea_m. + // One of our asynchronous writes to the remote peer has completed. + // Make sure it succeeded and then delete the message. + + virtual ACE_HANDLE handle (void) const; + // Get the I/O handle used by this <handler>. This method will be + // called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is + // passed to <open>. + + virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); + // We've been removed from the Reactor. + + virtual int handle_output (ACE_HANDLE fd); + // Called when output events should start. Note that this is + // automatically invoked by the + // <ACE_Reactor_Notificiation_Strategy>. + +private: + ACE_SOCK_Stream stream_; + // Socket that we have connected to the server. + + ACE_Reactor_Notification_Strategy strategy_; + // The strategy object that the reactor uses to notify us when + // something is added to the queue. + + // = Remote peer info. + char *host_; + // Name of remote host. + + u_short port_; + // Port number for remote host. + + ACE_Asynch_Read_Stream rd_stream_; + // Read stream + + ACE_Asynch_Write_Stream wr_stream_; + // Write stream + + ACE_Message_Block mb_; + // Message Block for reading from the network +}; + +class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> +// = TITLE +// Active Object. Reads from STDIN and passes message blocks to +// the peer handler. +{ +public: + STDIN_Handler (MT_TASK &ph); + // Initialization. + + virtual int open (void * = 0); + // Activate object. + + virtual int close (u_long = 0); + // Shut down. + + int svc (void); + // Thread runs here as an active object. + +private: + static void handler (int signum); + // Handle a ^C. (Do nothing, this just illustrates how we can catch + // signals along with the other things). + + void register_thread_exit_hook (void); + // Helper function to register with the Reactor for thread exit. + + virtual int handle_signal (int index, siginfo_t *, ucontext_t *); + // The STDIN thread has exited. This means the user hit ^C. We can + // end the event loop. + + MT_TASK &ph_; + // Send all input to ph_. + + ACE_HANDLE thr_handle_; + // Handle of our thread. +}; + +Peer_Handler::Peer_Handler (int argc, char *argv[]) + : host_ (0), + port_ (ACE_DEFAULT_SERVER_PORT), + strategy_ (ACE_Reactor::instance (), + this, + ACE_Event_Handler::WRITE_MASK), + mb_ (BUFSIZ) +{ + // This code sets up the message to notify us when a new message is + // added to the queue. Actually, the queue notifies Reactor which + // then notifies us. + this->msg_queue ()->notification_strategy (&this->strategy_); + + ACE_Get_Opt get_opt (argc, argv, "h:p:"); + int c; + + while ((c = get_opt ()) != EOF) + { + switch (c) + { + case 'h': + host_ = get_opt.opt_arg (); + break; + case 'p': + port_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + } + } +} + +Peer_Handler::~Peer_Handler (void) +{ +} + +// This method creates the network connection to the remote peer. It +// does blocking connects and accepts depending on whether a hostname +// was specified from the command line. + +int +Peer_Handler::open (void *) +{ + if (host_ != 0) // Connector + { + ACE_INET_Addr addr (port_, host_); + ACE_SOCK_Connector connector; + + // Establish connection with server. + if (connector.connect (stream_, addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); + + ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n")); + } + else // Acceptor + { + ACE_SOCK_Acceptor acceptor; + ACE_INET_Addr local_addr (port_); + + if ((acceptor.open (local_addr) == -1) || + (acceptor.accept (this->stream_) == -1)) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); + + ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n")); + } + + int result = this->rd_stream_.open (*this); + if (result != 0) + return result; + + result = this->wr_stream_.open (*this); + if (result != 0) + return result; + + result = this->rd_stream_.read (this->mb_, + this->mb_.size ()); + return result; +} + +// One of our asynchronous writes to the remote peer has completed. +// Make sure it succeeded and then delete the message. + +void +Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + if (result.bytes_transferred () <= 0) + ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed", + result.bytes_transferred ())); + + // This was allocated by the STDIN_Handler, queued, dequeued, passed + // to the proactor, and now passed back to us. + result.message_block ().release (); +} + +// The remote peer has sent us something. If it succeeded, print +// out the message and reinitiate a read. Otherwise, fail. In both +// cases, delete the message sent. + + +void +Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + if (result.bytes_transferred () > 0 && + this->mb_.length () > 0) + { + this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0'; + // Print out the message received from the server. + ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ())); + } + else + { + // If a read failed, we will assume it's because the remote peer + // went away. We will end the event loop. Since we're in the + // main thread, we don't need to do a notify. + ACE_Reactor::end_event_loop(); + return; + } + + // Reset pointers + this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ()); + + // Start off another read + if (this->rd_stream_.read (this->mb_, + this->mb_.size ()) == -1) + ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler")); +} + +// This is so the Proactor can get our handle. +ACE_HANDLE +Peer_Handler::handle (void) const +{ + return this->stream_.get_handle (); +} + +// We've been removed from the Reactor. +int +Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n")); + return 0; +} + +// New stuff added to the message queue. Try to dequeue a message. +int +Peer_Handler::handle_output (ACE_HANDLE fd) +{ + ACE_Message_Block *mb; + + ACE_Time_Value tv (ACE_Time_Value::zero); + + // Forward the message to the remote peer receiver. + if (this->getq (mb, &tv) != -1) + { + if (this->wr_stream_.write (*mb, + mb->length ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); + } + return 0; +} + +void +STDIN_Handler::handler (int signum) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum)); +} + +STDIN_Handler::STDIN_Handler (MT_TASK &ph) + : ph_ (ph) +{ + // Register for ^C from the console. We just need to catch the + // exception so that the kernel doesn't kill our process. + // Registering this signal handler just tells the kernel that we + // know what we're doing; to leave us alone. + + ACE_OS::signal (SIGINT, (ACE_SignalHandler) STDIN_Handler::handler); +}; + +// Activate object. + +int +STDIN_Handler::open (void *) +{ + if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); + + return 0; +} + +// Shut down. + +int +STDIN_Handler::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n")); + return 0; +} + +// Thread runs here. + +int +STDIN_Handler::svc (void) +{ + this->register_thread_exit_hook (); + + for (;;) + { + ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + + // Read from stdin into mb. + int read_result = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + + // If read succeeds, put mb to peer handler, else end the loop. + if (read_result > 0) + { + mb->wr_ptr (read_result); + // Note that this call will first enqueue mb onto the peer + // handler's message queue, which will then turn around and + // notify the Reactor via the Notification_Strategy. This + // will subsequently signal the Peer_Handler, which will + // react by calling back to its handle_output() method, + // which dequeues the message and sends it to the peer + // across the network. + this->ph_.putq (mb); + } + else + { + mb->release (); + break; + } + } + + // handle_signal will get called on the main proactor thread since + // we just exited and the main thread is waiting on our thread exit. + return 0; +} + +// Register an exit hook with the reactor. + +void +STDIN_Handler::register_thread_exit_hook (void) +{ + // Get a real handle to our thread. + ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); + + // Register ourselves to get called back when our thread exits. + + if (ACE_Reactor::instance ()-> + register_handler (this, this->thr_handle_) == -1) + ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n")); +} + +// The STDIN thread has exited. This means the user hit ^C. We can +// end the event loop and delete ourself. + +int +STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) +{ + ACE_ASSERT (this->thr_handle_ == si->si_handle_); + ACE_Reactor::end_event_loop (); + return 0; +} + +int +main (int argc, char *argv[]) +{ + // Let the proactor know that it will be used with Reactor + // Create specific proactor + ACE_WIN32_Proactor win32_proactor (0, 1); + // Get the interface proactor + ACE_Proactor proactor (&win32_proactor); + // Put it as the instance. + ACE_Proactor::instance (&proactor); + + // Open handler for remote peer communications this will run from + // the main thread. + Peer_Handler peer_handler (argc, argv); + + if (peer_handler.open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p open failed, errno = %d.\n", + "peer_handler", errno), 0); + + // Open active object for reading from stdin. + STDIN_Handler stdin_handler (peer_handler); + + // Spawn thread. + if (stdin_handler.open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p open failed, errno = %d.\n", + "stdin_handler", errno), 0); + + // Register proactor with Reactor so that we can demultiplex + // "waitable" events and I/O operations from a single thread. + if (ACE_Reactor::instance ()->register_handler + (ACE_Proactor::instance ()->implementation ()) != 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", + argv[0]), -1); + + // Run main event demultiplexor. + ACE_Reactor::run_event_loop (); + + // Remove proactor with Reactor. + if (ACE_Reactor::instance ()->remove_handler + (ACE_Proactor::instance (), ACE_Event_Handler::DONT_CALL) != 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", + argv[0]), -1); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_timeouts.cpp b/examples/Reactor/WFMO_Reactor/test_timeouts.cpp new file mode 100644 index 00000000000..8fa41f46a66 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_timeouts.cpp @@ -0,0 +1,80 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_timeouts.cpp +// +// = DESCRIPTION +// +// This example application shows how to write Reactor event +// loops that handle events for some fixed amount of time. +// +// Run this example (without arguments) to see the timers +// expire. The order should be: +// +// foo, bar, foo, bar, foo, foo, bar, foo, bar, foo +// +// = AUTHOR +// Tim Harrison +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Reactor.h" +#include "ace/Service_Config.h" +#include "ace/OS.h" + +ACE_RCSID(WFMO_Reactor, test_timeouts, "$Id$") + +class Timeout_Handler : public ACE_Event_Handler +// = TITLE +// Generic timeout handler. +{ +public: + Timeout_Handler (void) + : count_ (0) {} + + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg) + // Print out when timeouts occur. + { + ACE_DEBUG ((LM_DEBUG, + "%d timeout occurred for %s.\n", + ++count_, + (char *) arg)); + return 0; + } + +private: + int count_; +}; + +int +main (int, char *[]) +{ + Timeout_Handler handler; + + // Register a 3 second timer. + ACE_Time_Value bar_tv (3); + ACE_Reactor::instance ()->schedule_timer (&handler, + (void *) "Bar", + bar_tv, + bar_tv); + + // Register a 2 second timer. + ACE_Time_Value foo_tv (2); + ACE_Reactor::instance ()->schedule_timer (&handler, + (void *) "Foo", + foo_tv, + foo_tv); + // Handle events for 12 seconds. + ACE_Time_Value run_time (12); + if (ACE_Reactor::run_event_loop(run_time) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); + + return 0; +} diff --git a/examples/Reactor/WFMO_Reactor/test_window_messages.cpp b/examples/Reactor/WFMO_Reactor/test_window_messages.cpp new file mode 100644 index 00000000000..f46dea7ad19 --- /dev/null +++ b/examples/Reactor/WFMO_Reactor/test_window_messages.cpp @@ -0,0 +1,90 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_window_messages.cpp +// +// = DESCRIPTION +// +// Tests the Msg_WFMO_Reactor's ability to handle regular events +// and window messages. +// +// = AUTHOR +// +// Irfan Pyarali <irfan@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/Msg_WFMO_Reactor.h" +#include "ace/Reactor.h" +#include "ace/Auto_Ptr.h" + +ACE_RCSID(WFMO_Reactor, test_window_messages, "$Id$") + +class Event_Handler : public ACE_Event_Handler +{ +public: + int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + + ACE_Auto_Event handle_; + int iterations_; +}; + +int +Event_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + --this->iterations_; + + if (this->iterations_ == 0) + ACE_Reactor::end_event_loop (); + + return 0; +} + +static Event_Handler *global_event_handler; + +void WINAPI +timer_callback (HWND hwnd, + UINT uMsg, + UINT idEvent, + DWORD dwTime) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) timeout occured @ %d\n", dwTime)); + + global_event_handler->handle_.signal (); +} + +int +main (int argc, char** argv) +{ + // Manage memory automagically. + // Note that ordering here is important. + ACE_Reactor_Impl *impl = new ACE_Msg_WFMO_Reactor; + auto_ptr<ACE_Reactor> reactor (new ACE_Reactor (impl)); + ACE_Reactor::instance (reactor.get ()); + auto_ptr<ACE_Reactor_Impl> delete_impl (impl); + + Event_Handler event_handler; + global_event_handler = &event_handler; + + event_handler.iterations_ = 5; + int result = ACE_Reactor::instance ()->register_handler (&event_handler, + event_handler.handle_.handle ()); + ACE_ASSERT (result == 0); + + ACE_Time_Value timeout (1); + result = ::SetTimer (NULL, // handle of window for timer messages + NULL, // timer identifier + timeout.msec (), // time-out value + (TIMERPROC) &timer_callback // address of timer procedure + ); + ACE_ASSERT (result != 0); + + ACE_Reactor::run_event_loop (); + + return 0; +} |