diff options
Diffstat (limited to 'ACE/tests/Message_Queue_Test_Ex.cpp')
-rw-r--r-- | ACE/tests/Message_Queue_Test_Ex.cpp | 677 |
1 files changed, 677 insertions, 0 deletions
diff --git a/ACE/tests/Message_Queue_Test_Ex.cpp b/ACE/tests/Message_Queue_Test_Ex.cpp new file mode 100644 index 00000000000..fd4f714a42b --- /dev/null +++ b/ACE/tests/Message_Queue_Test_Ex.cpp @@ -0,0 +1,677 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Message_Queue_Test_Ex.cpp +// +// = DESCRIPTION +// This is: +// 1. A simple test of the ACE_Message_Queue_Ex that executes +// a performance measurement test for both single-threaded +// (null synch) and thread-safe ACE_Message_Queue_Ex +// instantiations. +// 2. An example of using a user-defined class to parameterize +// ACE_Message_Queue_Ex. +// +// = AUTHORS +// Michael Vitlo <mvitalo@sprynet.com>, copied the code from: +// Irfan Pyarali <irfan@cs.wustl.edu> and +// David L. Levine <levine@cs.wustl.edu> +// +// ============================================================================ + +#include "test_config.h" +#include "ace/Thread_Manager.h" + +#include "ace/Message_Queue.h" +#include "ace/Synch_Traits.h" +#include "ace/Null_Mutex.h" +#include "ace/Null_Condition.h" +#include "ace/High_Res_Timer.h" +#include "ace/Message_Block.h" +#include "ace/OS_NS_sys_time.h" +#include "ace/Barrier.h" +#include "Message_Queue_Test_Ex.h" // Declares User_Class + +const ACE_TCHAR usage[] = + ACE_TEXT ("usage: Message_Queue_Test_Ex <number of messages>\n"); + +typedef ACE_Message_Queue_Ex<User_Class, ACE_NULL_SYNCH> QUEUE; + +static const int MAX_MESSAGES = 10000; +static const char test_message[] = "ACE_Message_Queue_Ex Test Message"; + +static int max_messages = MAX_MESSAGES; +static int chain_limit = 4; +static ACE_Barrier tester_barrier (2); + +// Dynamically allocate to avoid a static. +static ACE_High_Res_Timer *timer = 0; + +// Helper printing function +static void +print_message (const ACE_TCHAR *message) +{ + ACE_Time_Value tv; + timer->elapsed_time (tv); + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"), + message, + max_messages, + tv.msec (), + (double) tv.msec () / max_messages)); +} + +#if defined (ACE_HAS_THREADS) +typedef ACE_Message_Queue_Ex<User_Class, ACE_MT_SYNCH> SYNCH_QUEUE; + +struct Queue_Wrapper +{ + // = TITLE + // Container for data passed to sender and receiver in + // performance test. + // + // = DESCRIPTION + // For use in multithreaded performance test. + + SYNCH_QUEUE *q_; + // The message queue. + + User_Class **send_block_; + // Pointer to messages blocks for sender to send to reciever. + + Queue_Wrapper (void) + : q_ (0), send_block_ (0) + { + } + // Default constructor. +}; + +struct MQ_Ex_N_Tester_Wrapper +{ + // = TITLE + // Container for data passed to sender in the MQ_Ex_N_Tester + // performance test. + // + // = DESCRIPTION + // For use in multithreaded performance test. + MQ_Ex_N_Tester *tester_; + User_Class *head_send_block_; +}; + +#endif /* ACE_HAS_THREADS */ + +// Encapsulates the sent messages creation and destruction +struct Send_Messages +{ + Send_Messages (int number_of_messages, int chain_limit): + send_block_ (0), + number_of_messages_ (number_of_messages), + chain_limit_ (chain_limit) + { + } + + int create_messages (const char test_message[]) + { + int limit = this->number_of_messages_ / this->chain_limit_; + ACE_NEW_RETURN (this->send_block_, + User_Class *[limit], + -1); + + int i, j; + for (i = 0; i < limit; ++i) + { + User_Class *&temp1 = this->send_block_[i]; + ACE_NEW_RETURN (temp1, + User_Class (test_message), + -1); + User_Class *tail = temp1; + for (j = 1; j < this->chain_limit_; ++j) + { + User_Class *temp2 = 0; + ACE_NEW_RETURN (temp2, + User_Class (test_message), + -1); + tail->next (temp2); + tail = temp2; + } + } + this->head_send_block_ = this->send_block_[0]; + return 0; + } + + ~Send_Messages () + { + int j, i = 0; + int limit = this->number_of_messages_ / this->chain_limit_; + for (; i < limit; ++i) + { + User_Class *&temp1 = this->send_block_[i]; + for (j = 0; j < this->chain_limit_; ++j) + { + User_Class *temp2 = temp1->next (); + delete temp1; + temp1 = temp2; + } + } + delete [] this->send_block_; + } + + User_Class * head_send_block_; + User_Class ** send_block_; + int number_of_messages_; + int chain_limit_; +}; + +// Encapsulates the received messages creation and destruction +struct Receive_Messages +{ + Receive_Messages (int number_of_messages) : + receive_block_ (0), + number_of_messages_ (number_of_messages) + { + } + + int create (void) + { + ACE_NEW_RETURN (this->receive_block_, + User_Class *[this->number_of_messages_], + -1); + return 0; + } + + ~Receive_Messages () + { + delete [] this->receive_block_; + } + + User_Class **receive_block_; + int number_of_messages_; +}; + +static int +single_thread_performance_test (void) +{ + const char test_message[] = + "ACE_Message_Queue_Ex Test Message"; + const ACE_TCHAR *message = + ACE_TEXT ("ACE_Message_Queue_Ex<ACE_NULL_SYNCH>, single thread"); + + // Create a message queue. + QUEUE *msgq = 0; + + ACE_NEW_RETURN (msgq, + QUEUE, + -1); + + // Create the messages. Allocate off the heap in case messages is + // large relative to the amount of stack space available. + User_Class **send_block = 0; + ACE_NEW_RETURN (send_block, + User_Class *[max_messages], + -1); + + int i = 0; + + for (i = 0; i < max_messages; ++i) + ACE_NEW_RETURN (send_block[i], + User_Class (test_message), + -1); + + User_Class **receive_block_p = 0; + ACE_NEW_RETURN (receive_block_p, + User_Class *[max_messages], + -1); + + timer->start (); + + // Send/receive the messages. + for (i = 0; i < max_messages; ++i) + { + if (msgq->enqueue_tail (send_block[i]) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue")), + -1); + + if (msgq->dequeue_head (receive_block_p[i]) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + -1); + } + + timer->stop (); + print_message (message); + timer->reset (); + + delete [] receive_block_p; + + for (i = 0; i < max_messages; ++i) + delete send_block[i]; + delete [] send_block; + delete msgq; + + return 0; +} + +int +MQ_Ex_N_Tester::single_thread_performance_test (void) +{ + // Create the messages. Allocate off the heap in case messages is + // large relative to the amount of stack space available. + + if ((0 != this->test_enqueue_tail ()) || + (0 != this->test_enqueue_head ()) ) + { + return -1; + } + + return 0; +} + +int +MQ_Ex_N_Tester::test_enqueue_tail (void) +{ + const ACE_TCHAR *message = + ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_tail"); + + // Send_Messages creates messages and deletes them when it gets out of scope + Send_Messages messages (max_messages, chain_limit); + if (-1 == messages.create_messages (test_message)) + { + return -1; + } + Receive_Messages r_messages (max_messages); + if (-1 == r_messages.create ()) + { + return -1; + } + + // prepare + int limit = max_messages / chain_limit; + timer->start (); + // Send with just one call + for (int i = 0; i < limit; ++i) + { + if (-1 == this->st_queue_.enqueue_tail (messages.send_block_[i])) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_tail_n")), + -1); + } + + for (int j = 0, k = 0; j < chain_limit; ++j, ++k) + { + if (this->st_queue_.dequeue_head (r_messages.receive_block_[k]) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + -1); + } + } + } + timer->stop (); + + print_message (message); + + timer->reset (); + + return 0; +} + +int +MQ_Ex_N_Tester::test_enqueue_head (void) +{ + const ACE_TCHAR *message = + ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_head"); + + // Send_Messages creates messages and deletes them when it gets out of scope + Send_Messages messages (max_messages, chain_limit); + if (-1 == messages.create_messages (test_message)) + { + return -1; + } + Receive_Messages r_messages (max_messages); + if (-1 == r_messages.create ()) + { + return -1; + } + + // prepare + int i, j, k = 0; + + int limit = max_messages / chain_limit; + timer->start (); + + // Send/receive the messages. + // Send with just one call + for (i = 0; i < limit; ++i) + { + if (-1 == this->st_queue_.enqueue_head (messages.send_block_[i])) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_tail_n")), + -1); + } + + for (j = 0; j < chain_limit; ++j, ++k) + { + if (this->st_queue_.dequeue_head (r_messages.receive_block_[k]) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + -1); + } + } + } + timer->stop (); + + print_message (message); + + timer->reset (); + + return 0; + +} + +#if defined (ACE_HAS_THREADS) + +static void * +receiver (void *arg) +{ + Queue_Wrapper *queue_wrapper = reinterpret_cast<Queue_Wrapper *> (arg); + int i; + + User_Class **receive_block_p = 0; + ACE_NEW_RETURN (receive_block_p, + User_Class *[max_messages], + (void *) -1); + + for (i = 0; i < max_messages; ++i) + if (queue_wrapper->q_->dequeue_head (receive_block_p[i]) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + 0); + timer->stop (); + + delete [] receive_block_p; + + return 0; +} + +static void * +sender (void *arg) +{ + Queue_Wrapper *queue_wrapper = reinterpret_cast<Queue_Wrapper *> (arg); + int i; + + timer->start (); + + // Send the messages. + for (i = 0; i < max_messages; ++i) + if (queue_wrapper->q_-> + enqueue_tail (queue_wrapper->send_block_[i]) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue")), + 0); + return 0; +} + +static int +performance_test (void) +{ + Queue_Wrapper queue_wrapper; + const ACE_TCHAR *message = + ACE_TEXT ("ACE_Message_Queue_Ex<ACE_SYNCH>"); + int i = 0; + + // Create the messages. Allocate off the heap in case messages is + // large relative to the amount of stack space available. Allocate + // it here instead of in the sender, so that we can delete it after + // the _receiver_ is done. + User_Class **send_block = 0; + ACE_NEW_RETURN (send_block, + User_Class *[max_messages], + -1); + + for (i = 0; i < max_messages; ++i) + ACE_NEW_RETURN (send_block[i], + User_Class (test_message), + -1); + + queue_wrapper.send_block_ = send_block; + + ACE_NEW_RETURN (queue_wrapper.q_, + SYNCH_QUEUE, + -1); + + if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) sender, + &queue_wrapper, + THR_BOUND) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("spawning sender thread")), + -1); + + if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) receiver, + &queue_wrapper, + THR_BOUND) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("spawning receiver thread")), + -1); + + ACE_Thread_Manager::instance ()->wait (); + print_message (message); + timer->reset (); + + delete queue_wrapper.q_; + queue_wrapper.q_ = 0; + + for (i = 0; i < max_messages; ++i) + delete send_block[i]; + delete [] send_block; + + return 0; +} + +int +MQ_Ex_N_Tester::performance_test (void) +{ + const ACE_TCHAR *message = + ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_SYNCH>"); + + Send_Messages messages (max_messages, chain_limit); + if (-1 == messages.create_messages (test_message)) + { + return -1; + } + + MQ_Ex_N_Tester_Wrapper tester_wrapper; + tester_wrapper.head_send_block_ = messages.head_send_block_; + tester_wrapper.tester_ = this; + + if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) &MQ_Ex_N_Tester::sender, + &tester_wrapper, + THR_BOUND) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("spawning sender thread")), + -1); + + if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) &MQ_Ex_N_Tester::receiver, + this, + THR_BOUND) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("spawning receiver thread")), + -1); + + ACE_Thread_Manager::instance ()->wait (); + + print_message (message); + + timer->reset (); + + return 0; +} + +ACE_THR_FUNC_RETURN +MQ_Ex_N_Tester::receiver (void *args) +{ + MQ_Ex_N_Tester *tester = reinterpret_cast<MQ_Ex_N_Tester *> (args); + + User_Class **receive_block_p = 0; + ACE_NEW_RETURN (receive_block_p, + User_Class *[max_messages], + (ACE_THR_FUNC_RETURN) -1); + + int i; + tester_barrier.wait (); + for (i = 0; i < max_messages; ++i) + { + if (tester->mt_queue_.dequeue_head (receive_block_p[i]) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head"))); + return (ACE_THR_FUNC_RETURN) -1; + } + } + timer->stop (); + + delete [] receive_block_p; + + return 0; +} + +ACE_THR_FUNC_RETURN +MQ_Ex_N_Tester::sender (void *args) +{ + MQ_Ex_N_Tester_Wrapper *tester_wrapper = + reinterpret_cast<MQ_Ex_N_Tester_Wrapper *> (args); + MQ_Ex_N_Tester *tester = tester_wrapper->tester_; + + Send_Messages messages (max_messages, chain_limit); + if (-1 == messages.create_messages (test_message)) + { + return (ACE_THR_FUNC_RETURN) -1; + } + int limit = max_messages / chain_limit; + tester_barrier.wait (); + timer->start (); + // Send/receive the messages. + timer->start (); + // Send with just one call + for (int i = 0; i < limit; ++i) + { + if (-1 == tester->mt_queue_.enqueue_tail (messages.send_block_[i])) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_tail_n"))); + return (ACE_THR_FUNC_RETURN) -1; + } + } + return 0; +} + +#endif /* ACE_HAS_THREADS */ + +int basic_queue_test (ACE_Message_Queue_Ex<User_Class, ACE_SYNCH>& q) +{ + int status = 0; + if (!q.is_empty ()) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("New queue is not empty!\n"))); + status = 1; + } + else + { + User_Class *b; + ACE_Time_Value tv (ACE_OS::gettimeofday ()); // Now + if (q.dequeue_head (b, &tv) != -1) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Dequeued from empty queue!\n"))); + status = 1; + } + else if (errno != EWOULDBLOCK) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got"))); + status = 1; + } + else + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Timed dequeue test: OK\n"))); + status = 0; // All is well + } + } + + return status; +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("Message_Queue_Test_Ex")); + + if (argc == 2) + if (! ACE_OS::strcmp (argv[1], ACE_TEXT ("-?"))) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%s/n"), + usage)); + else + max_messages = ACE_OS::atoi (argv[1]); + + int status = 0; + + // Be sure that the a timed out get sets the error code properly. + ACE_Message_Queue_Ex<User_Class, ACE_SYNCH> q1; + ACE_Message_Queue_Ex_N<User_Class, ACE_SYNCH> q2; + if (0 != basic_queue_test (q1) || + 0 != basic_queue_test (q2)) + { + ++status; + } + + ACE_NEW_RETURN (timer, + ACE_High_Res_Timer, + -1); + + status += single_thread_performance_test (); + +#if defined (ACE_HAS_THREADS) + status += performance_test (); +#endif /* ACE_HAS_THREADS */ + + { + MQ_Ex_N_Tester ex_n_tester; + status += ex_n_tester.single_thread_performance_test (); +#if defined (ACE_HAS_THREADS) + status += ex_n_tester.performance_test (); +#endif /* ACE_HAS_THREADS */ + } + + if (status != 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("test failed"))); + delete timer; + timer = 0; + + ACE_END_TEST; + return status; +} |