diff options
author | irfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-04-25 08:49:37 +0000 |
---|---|---|
committer | irfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-04-25 08:49:37 +0000 |
commit | a962d760266caec6af0d82038498ecccc8cb76e6 (patch) | |
tree | 3ed7553ac6dda87fa3d5771e1c5684b04fe30413 /tests/MT_Reactor_Upcall_Test.cpp | |
parent | 652766b0ef4e1bfface7cde27a88e9dba336674c (diff) | |
download | ATCD-a962d760266caec6af0d82038498ecccc8cb76e6.tar.gz |
ChangeLogTag: Thu Apr 25 03:34:43 2002 Irfan Pyarali <irfan@cs.wustl.edu>
Diffstat (limited to 'tests/MT_Reactor_Upcall_Test.cpp')
-rw-r--r-- | tests/MT_Reactor_Upcall_Test.cpp | 144 |
1 files changed, 106 insertions, 38 deletions
diff --git a/tests/MT_Reactor_Upcall_Test.cpp b/tests/MT_Reactor_Upcall_Test.cpp index 95266a08fee..22b5870d003 100644 --- a/tests/MT_Reactor_Upcall_Test.cpp +++ b/tests/MT_Reactor_Upcall_Test.cpp @@ -33,7 +33,39 @@ int number_of_messages = 10; int sleep_time_in_msec = 100; int lock_upcall = 1; static const char *message = -"Hello there! Hope you get this message"; +"Hello there!"; + +class Guard +{ +public: + Guard (ACE_SYNCH_MUTEX &lock) + : lock_ (lock) + { + if (lock_upcall) + lock.acquire (); + } + + ~Guard (void) + { + if (lock_upcall) + this->lock_.release (); + } + + ACE_SYNCH_MUTEX &lock_; +}; + +struct Message +{ + enum Type + { + DATA, + SHUTDOWN + }; + + Type type_; + size_t size_; + char data_[BUFSIZ]; +}; class Handler : public ACE_Event_Handler { @@ -44,11 +76,13 @@ public: ACE_Pipe pipe_; int number_of_messages_read_; ACE_SYNCH_MUTEX lock_; + int shutdown_; }; Handler::Handler (ACE_Reactor &reactor) : ACE_Event_Handler (&reactor), - number_of_messages_read_ (0) + number_of_messages_read_ (0), + shutdown_ (0) { // Create the pipe. int result @@ -66,52 +100,62 @@ Handler::Handler (ACE_Reactor &reactor) int Handler::handle_input (ACE_HANDLE fd) { - if (lock_upcall) - this->lock_.acquire (); + Guard monitor (this->lock_); - if (this->number_of_messages_read_ == number_of_messages) - { - // Should not happen, but I saw this happen occasionally with the - // WFMO_Reactor + // If we have been shutdown, return. + if (this->shutdown_) + return 0; + // Read fixed part of message. + Message message; + size_t fixed_size_of_message = + sizeof Message::Type + sizeof size_t; + + ssize_t result = + ACE::recv_n (fd, + &message, + fixed_size_of_message); + ACE_ASSERT (result == ssize_t (fixed_size_of_message)); + + // On shutdown message, stop the event loop. + if (message.type_ == Message::SHUTDOWN) + { ACE_DEBUG ((LM_DEBUG, - "(%t) Incorrect/unexpected wakeup for message %d: ignoring....\n", - this->number_of_messages_read_ + 1)); + "(%t) Shutdown message\n")); - if (lock_upcall) - this->lock_.release (); + this->shutdown_ = 1; - return 0; - } + this->reactor ()->end_reactor_event_loop (); - ACE_DEBUG ((LM_DEBUG, - "(%t) Starting to handle message %d\n", - this->number_of_messages_read_ + 1)); + // Remove self from Reactor. + return -1; + } - char buffer[BUFSIZ]; - ssize_t result = + // Else it is a data message: read the data. + result = ACE::recv_n (fd, - buffer, - ACE_OS::strlen (message)); + &message.data_, + message.size_); + ACE_ASSERT (result == ssize_t (message.size_)); - ACE_ASSERT (result > 0); - buffer[result] = '\0'; + message.data_[result] = '\0'; + ACE_DEBUG ((LM_DEBUG, + "(%t) Starting to handle message %d: %s\n", + this->number_of_messages_read_ + 1, + message.data_)); + + // Process message (sleep). ACE_OS::sleep (ACE_Time_Value (0, sleep_time_in_msec * 1000)); + // Keep count. this->number_of_messages_read_++; - if (this->number_of_messages_read_ == number_of_messages) - this->reactor ()->end_reactor_event_loop (); - ACE_DEBUG ((LM_DEBUG, "(%t) Completed handling message %d\n", this->number_of_messages_read_)); - if (lock_upcall) - this->lock_.release (); - return 0; } @@ -142,11 +186,24 @@ test_reactor_upcall (ACE_Reactor &reactor) Handler handler (reactor); Event_Loop_Task event_loop_task (reactor); + // Start up the event loop threads. int result = event_loop_task.activate (THR_NEW_LWP | THR_JOINABLE, number_of_event_loop_threads); ACE_ASSERT (result == 0); + // Data message. + Message data_message; + data_message.type_ = + Message::DATA; + data_message.size_ = + ACE_OS::strlen (message); + ACE_OS::strcpy (data_message.data_, + message); + + size_t size_of_data_message = + sizeof Message::Type + sizeof size_t + data_message.size_; + for (int i = 0; i < number_of_messages; ++i) @@ -154,24 +211,35 @@ test_reactor_upcall (ACE_Reactor &reactor) // This should trigger a call to <handle_input>. result = ACE::send_n (handler.pipe_.write_handle (), - message, - ACE_OS::strlen (message)); - ACE_ASSERT (result == ssize_t (ACE_OS::strlen (message))); + &data_message, + size_of_data_message); + ACE_ASSERT (result == ssize_t (size_of_data_message)); } - event_loop_task.wait (); + // We are done: send shutdown message. + Message shutdown_message; + shutdown_message.type_ = + Message::SHUTDOWN; + shutdown_message.size_ = 0; + + size_t size_of_shutdown_message = + sizeof Message::Type + sizeof size_t; + // This should trigger a call to <handle_input>. result = - reactor.remove_handler (handler.pipe_.read_handle (), - ACE_Event_Handler::READ_MASK | - ACE_Event_Handler::DONT_CALL); - ACE_ASSERT (result == 0); + ACE::send_n (handler.pipe_.write_handle (), + &shutdown_message, + size_of_shutdown_message); + ACE_ASSERT (result == ssize_t (size_of_shutdown_message)); + + // Wait for the event loop tasks to exit. + event_loop_task.wait (); } int parse_args (int argc, ACE_TCHAR *argv[]) { - ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:m:s:l:")); + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("t:m:s:l:")); int c; |