// $Id$ // ============================================================================ // // = LIBRARY // tests // // = FILENAME // Message_Queue_Notification_Test.cpp // // = DESCRIPTION // This is a test to illustrate the notification mechanisms in // Message_Queue and its integration with Reactor. // // Note the following things about this example: // // 1. Multiple threads are not required. // 2. You do not have to explicitly notify the Reactor // 3. This code will work the same with any Reactor Implementation // 4. handle_input, handle_exception, handle_output are the only // callbacks supported by this mechanism // 5. The notification mechanism need not notify the Reactor. You can // write your own strategy classes that can do whatever application // specific behavior you want. // // = AUTHOR // Irfan Pyarali // // ============================================================================ #include "test_config.h" #include "ace/Reactor.h" #include "ace/Strategies.h" #include "ace/Task.h" static int iterations = 10; class Message_Handler : public ACE_Task { public: Message_Handler (ACE_Reactor &reactor); virtual int handle_input (ACE_HANDLE); virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE); virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE); private: int process_message (void); void make_message (void); ACE_Reactor_Notification_Strategy notification_strategy_; }; Message_Handler::Message_Handler (ACE_Reactor &reactor) // First time handle_input will be called : notification_strategy_ (&reactor, this, ACE_Event_Handler::READ_MASK) { this->msg_queue ()->notification_strategy (&this->notification_strategy_); this->make_message (); } int Message_Handler::handle_input (ACE_HANDLE) { ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_input\n"))); // Next time handle_output will be called this->notification_strategy_.mask (ACE_Event_Handler::WRITE_MASK); return process_message (); } int Message_Handler::handle_output (ACE_HANDLE fd) { ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_output\n"))); ACE_UNUSED_ARG (fd); // Next time handle_exception will be called this->notification_strategy_.mask (ACE_Event_Handler::EXCEPT_MASK); return process_message (); } int Message_Handler::handle_exception (ACE_HANDLE fd) { ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Message_Handler::handle_exception\n"))); ACE_UNUSED_ARG (fd); // Next time handle_input will be called this->notification_strategy_.mask (ACE_Event_Handler::READ_MASK); return process_message (); } int Message_Handler::process_message (void) { ACE_Message_Block *mb; if (this->getq (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("dequeue_head")), -1); else { ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message received = %s\n"), mb->rd_ptr ())); delete mb; } this->make_message (); return 0; } void Message_Handler::make_message (void) { if (--iterations > 0) { ACE_Message_Block *mb = new ACE_Message_Block ((char *) ASYS_TEXT ("hello")); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("sending message\n"))); this->putq (mb); } } int main (int, ASYS_TCHAR *[]) { ACE_START_TEST (ASYS_TEXT ("Message_Queue_Notifications_Test")); #if defined (ACE_HAS_THREADS) ACE_Reactor reactor; Message_Handler mh (reactor); while (iterations > 0) reactor.handle_events (); #else ACE_ERROR ((LM_ERROR, ASYS_TEXT ("threads not supported on this platform\n"))); #endif /* ACE_HAS_THREADS */ ACE_END_TEST; return 0; }