// $Id$ #include "ace/Service_Config.h" #include "ace/Thread_Manager.h" #include "ace/Thread.h" #include "ace/Synch_T.h" ACE_RCSID(Misc, notification, "$Id$") #if defined (ACE_HAS_THREADS) #if defined (CHORUS) // Chorus does not have signal, so we'll stop after a number of rounds. #define MAX_ITERATIONS 3 #else #define MAX_ITERATIONS 10000 #endif /* CHORUS */ class Thread_Handler : public ACE_Event_Handler { // = TITLE // Illustrate how the ACE_Reactor's thread-safe event notification // mechanism works. // // = DESCRIPTION // Handle timeouts in the main thread via the ACE_Reactor and I/O // events in a separate thread. Just before the separate I/O // thread exits it notifies the ACE_Reactor in the main thread // using the ACE_Reactor's notification mechanism. public: Thread_Handler (int delay, int interval, size_t n_threads, size_t max_iterations); // Constructor. Thread_Handler (size_t id, size_t max_iterations); ~Thread_Handler (void); // Destructor. virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); // Handle signals. virtual int handle_exception (ACE_HANDLE); // Print data from main thread. virtual int handle_output (ACE_HANDLE); // Print data from main thread. virtual int handle_timeout (const ACE_Time_Value &, const void *); // Handle timeout events in the main thread. virtual int handle_input (ACE_HANDLE); // General notification messages to the Reactor. virtual int notify (ACE_Time_Value *tv = 0); // Perform notifications. virtual int svc (void); // Handle I/O events in a separate threads. private: static void *svc_run (void *); // Glues C++ to C thread library functions. size_t id_; // ID passed in by Thread_Handler constructor. int iterations_; static sig_atomic_t shutdown_; // Shutting down. // = Timing variables. // Delay factor for timer-driven I/O. static ACE_Time_Value delay_; // Interval factor for Event_Handler timer. static ACE_Time_Value interval_; }; // Shutdown flag. sig_atomic_t Thread_Handler::shutdown_ = 0; // Delay factor for timer-driven I/O. ACE_Time_Value Thread_Handler::delay_; // Interval factor for Event_Handler timer. ACE_Time_Value Thread_Handler::interval_; Thread_Handler::Thread_Handler (size_t id, size_t max_iterations) : id_ (id), iterations_ (max_iterations) { } Thread_Handler::~Thread_Handler (void) { // Cleanup resources so that we don't crash and burn when shutdown. ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (), ACE_Thread_Manager::instance ()); ACE_Reactor::instance ()->cancel_timer (this); } Thread_Handler::Thread_Handler (int delay, int interval, size_t n_threads, size_t max_iterations) : iterations_ (max_iterations) { ACE_Sig_Set sig_set; sig_set.sig_add (SIGQUIT); sig_set.sig_add (SIGINT); delay_.set (delay); interval_.set (interval); this->id_ = 0; if (ACE_Event_Handler::register_stdin_handler (this, ACE_Reactor::instance (), ACE_Thread_Manager::instance ()) == -1) ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler")); else if (ACE_Reactor::instance ()->register_handler (sig_set, this) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "register_handler")); else if (ACE_Reactor::instance ()->schedule_timer (this, 0, Thread_Handler::delay_, Thread_Handler::interval_) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); // Set up this thread's signal mask to block all the signal in the // , which is inherited by the threads it spawns. ACE_Sig_Guard guard (&sig_set); // Create N new threads of control Thread_Handlers. for (size_t i = 0; i < n_threads; i++) { Thread_Handler *th; ACE_NEW (th, Thread_Handler (i + 1, this->iterations_)); if (ACE_Thread::spawn (ACE_reinterpret_cast (ACE_THR_FUNC, &Thread_Handler::svc_run), ACE_reinterpret_cast (void *, th), THR_NEW_LWP | THR_DETACHED) != 0) ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Thread::spawn")); } // The destructor of unblocks the signal set so that only // this thread receives them! } int Thread_Handler::notify (ACE_Time_Value *timeout) { // Just do something to test the ACE_Reactor's multi-thread // capabilities... if (ACE_Reactor::instance ()->notify (this, ACE_Event_Handler::EXCEPT_MASK, timeout) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notification::notify:exception"), -1); else if (ACE_Reactor::instance ()->notify (this, ACE_Event_Handler::WRITE_MASK, timeout) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notification::notify:write"), -1); return 0; } // Test stdin handling that uses