summaryrefslogtreecommitdiff
path: root/tests/MT_Reactor_Upcall_Test.cpp
diff options
context:
space:
mode:
authorirfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-04-25 08:49:37 +0000
committerirfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-04-25 08:49:37 +0000
commita962d760266caec6af0d82038498ecccc8cb76e6 (patch)
tree3ed7553ac6dda87fa3d5771e1c5684b04fe30413 /tests/MT_Reactor_Upcall_Test.cpp
parent652766b0ef4e1bfface7cde27a88e9dba336674c (diff)
downloadATCD-a962d760266caec6af0d82038498ecccc8cb76e6.tar.gz
ChangeLogTag: Thu Apr 25 03:34:43 2002 Irfan Pyarali <irfan@cs.wustl.edu>
Diffstat (limited to 'tests/MT_Reactor_Upcall_Test.cpp')
-rw-r--r--tests/MT_Reactor_Upcall_Test.cpp144
1 files changed, 106 insertions, 38 deletions
diff --git a/tests/MT_Reactor_Upcall_Test.cpp b/tests/MT_Reactor_Upcall_Test.cpp
index 95266a08fee..22b5870d003 100644
--- a/tests/MT_Reactor_Upcall_Test.cpp
+++ b/tests/MT_Reactor_Upcall_Test.cpp
@@ -33,7 +33,39 @@ int number_of_messages = 10;
int sleep_time_in_msec = 100;
int lock_upcall = 1;
static const char *message =
-"Hello there! Hope you get this message";
+"Hello there!";
+
+class Guard
+{
+public:
+ Guard (ACE_SYNCH_MUTEX &lock)
+ : lock_ (lock)
+ {
+ if (lock_upcall)
+ lock.acquire ();
+ }
+
+ ~Guard (void)
+ {
+ if (lock_upcall)
+ this->lock_.release ();
+ }
+
+ ACE_SYNCH_MUTEX &lock_;
+};
+
+struct Message
+{
+ enum Type
+ {
+ DATA,
+ SHUTDOWN
+ };
+
+ Type type_;
+ size_t size_;
+ char data_[BUFSIZ];
+};
class Handler : public ACE_Event_Handler
{
@@ -44,11 +76,13 @@ public:
ACE_Pipe pipe_;
int number_of_messages_read_;
ACE_SYNCH_MUTEX lock_;
+ int shutdown_;
};
Handler::Handler (ACE_Reactor &reactor)
: ACE_Event_Handler (&reactor),
- number_of_messages_read_ (0)
+ number_of_messages_read_ (0),
+ shutdown_ (0)
{
// Create the pipe.
int result
@@ -66,52 +100,62 @@ Handler::Handler (ACE_Reactor &reactor)
int
Handler::handle_input (ACE_HANDLE fd)
{
- if (lock_upcall)
- this->lock_.acquire ();
+ Guard monitor (this->lock_);
- if (this->number_of_messages_read_ == number_of_messages)
- {
- // Should not happen, but I saw this happen occasionally with the
- // WFMO_Reactor
+ // If we have been shutdown, return.
+ if (this->shutdown_)
+ return 0;
+ // Read fixed part of message.
+ Message message;
+ size_t fixed_size_of_message =
+ sizeof Message::Type + sizeof size_t;
+
+ ssize_t result =
+ ACE::recv_n (fd,
+ &message,
+ fixed_size_of_message);
+ ACE_ASSERT (result == ssize_t (fixed_size_of_message));
+
+ // On shutdown message, stop the event loop.
+ if (message.type_ == Message::SHUTDOWN)
+ {
ACE_DEBUG ((LM_DEBUG,
- "(%t) Incorrect/unexpected wakeup for message %d: ignoring....\n",
- this->number_of_messages_read_ + 1));
+ "(%t) Shutdown message\n"));
- if (lock_upcall)
- this->lock_.release ();
+ this->shutdown_ = 1;
- return 0;
- }
+ this->reactor ()->end_reactor_event_loop ();
- ACE_DEBUG ((LM_DEBUG,
- "(%t) Starting to handle message %d\n",
- this->number_of_messages_read_ + 1));
+ // Remove self from Reactor.
+ return -1;
+ }
- char buffer[BUFSIZ];
- ssize_t result =
+ // Else it is a data message: read the data.
+ result =
ACE::recv_n (fd,
- buffer,
- ACE_OS::strlen (message));
+ &message.data_,
+ message.size_);
+ ACE_ASSERT (result == ssize_t (message.size_));
- ACE_ASSERT (result > 0);
- buffer[result] = '\0';
+ message.data_[result] = '\0';
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Starting to handle message %d: %s\n",
+ this->number_of_messages_read_ + 1,
+ message.data_));
+
+ // Process message (sleep).
ACE_OS::sleep (ACE_Time_Value (0,
sleep_time_in_msec * 1000));
+ // Keep count.
this->number_of_messages_read_++;
- if (this->number_of_messages_read_ == number_of_messages)
- this->reactor ()->end_reactor_event_loop ();
-
ACE_DEBUG ((LM_DEBUG,
"(%t) Completed handling message %d\n",
this->number_of_messages_read_));
- if (lock_upcall)
- this->lock_.release ();
-
return 0;
}
@@ -142,11 +186,24 @@ test_reactor_upcall (ACE_Reactor &reactor)
Handler handler (reactor);
Event_Loop_Task event_loop_task (reactor);
+ // Start up the event loop threads.
int result =
event_loop_task.activate (THR_NEW_LWP | THR_JOINABLE,
number_of_event_loop_threads);
ACE_ASSERT (result == 0);
+ // Data message.
+ Message data_message;
+ data_message.type_ =
+ Message::DATA;
+ data_message.size_ =
+ ACE_OS::strlen (message);
+ ACE_OS::strcpy (data_message.data_,
+ message);
+
+ size_t size_of_data_message =
+ sizeof Message::Type + sizeof size_t + data_message.size_;
+
for (int i = 0;
i < number_of_messages;
++i)
@@ -154,24 +211,35 @@ test_reactor_upcall (ACE_Reactor &reactor)
// This should trigger a call to <handle_input>.
result =
ACE::send_n (handler.pipe_.write_handle (),
- message,
- ACE_OS::strlen (message));
- ACE_ASSERT (result == ssize_t (ACE_OS::strlen (message)));
+ &data_message,
+ size_of_data_message);
+ ACE_ASSERT (result == ssize_t (size_of_data_message));
}
- event_loop_task.wait ();
+ // We are done: send shutdown message.
+ Message shutdown_message;
+ shutdown_message.type_ =
+ Message::SHUTDOWN;
+ shutdown_message.size_ = 0;
+
+ size_t size_of_shutdown_message =
+ sizeof Message::Type + sizeof size_t;
+ // This should trigger a call to <handle_input>.
result =
- reactor.remove_handler (handler.pipe_.read_handle (),
- ACE_Event_Handler::READ_MASK |
- ACE_Event_Handler::DONT_CALL);
- ACE_ASSERT (result == 0);
+ ACE::send_n (handler.pipe_.write_handle (),
+ &shutdown_message,
+ size_of_shutdown_message);
+ ACE_ASSERT (result == ssize_t (size_of_shutdown_message));
+
+ // Wait for the event loop tasks to exit.
+ event_loop_task.wait ();
}
int
parse_args (int argc, ACE_TCHAR *argv[])
{
- ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:m:s:l:"));
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("t:m:s:l:"));
int c;