summaryrefslogtreecommitdiff
path: root/ACE/tests/Message_Queue_Test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/tests/Message_Queue_Test.cpp')
-rw-r--r--ACE/tests/Message_Queue_Test.cpp970
1 files changed, 970 insertions, 0 deletions
diff --git a/ACE/tests/Message_Queue_Test.cpp b/ACE/tests/Message_Queue_Test.cpp
new file mode 100644
index 00000000000..77ea8e51d7a
--- /dev/null
+++ b/ACE/tests/Message_Queue_Test.cpp
@@ -0,0 +1,970 @@
+
+//=============================================================================
+/**
+ * @file Message_Queue_Test.cpp
+ *
+ * $Id$
+ *
+ * This is:
+ * 0) a test that ensures key ACE_Message_Queue features are
+ * working properly, including timeouts and priorities
+ * 1) a simple test of the ACE_Message_Queue that illustrates how to
+ * use the forward and reverse iterators
+ * 2) a simple performance measurement test for both single-threaded
+ * (null synch), thread-safe ACE_Message_Queues, and
+ * ACE_Message_Queue_Vx, which wraps VxWorks message queues
+ * 3) a test/usage example of ACE_Message_Queue_Vx
+ * 4) a test of the message counting in a message queue under load.
+ *
+ * @author Irfan Pyarali <irfan@cs.wustl.edu>
+ * @author David L. Levine <levine@cs.wustl.edu>
+ * @author and Douglas C. Schmidt <schmidt@vanderbilt.edu>
+ */
+//=============================================================================
+
+
+#include "test_config.h"
+#include "ace/Atomic_Op.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Message_Queue.h"
+#include "ace/Message_Queue_NT.h"
+#include "ace/Message_Queue_Vx.h"
+#include "ace/Synch_Traits.h"
+#include "ace/Null_Mutex.h"
+#include "ace/Null_Condition.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Task.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_sys_time.h"
+#include "ace/OS_NS_unistd.h"
+
+
+
+const ACE_TCHAR usage[] = ACE_TEXT ("usage: Message_Queue_Test <number of messages>\n");
+
+typedef ACE_Message_Queue<ACE_NULL_SYNCH> QUEUE;
+typedef ACE_Message_Queue_Iterator<ACE_NULL_SYNCH> ITERATOR;
+typedef ACE_Message_Queue_Reverse_Iterator<ACE_NULL_SYNCH> REVERSE_ITERATOR;
+
+#if defined (ACE_HAS_WINCE)
+static const int MESSAGE_FACTOR = 10000;
+#else
+static const int MESSAGE_FACTOR = 100000;
+#endif
+static const int MAX_MESSAGES = 10000;
+static const int MAX_MESSAGE_SIZE = 32;
+static const char test_message[] = "ACE_Message_Queue Test Message";
+
+static int max_messages = MAX_MESSAGES;
+
+// Dynamically allocate to avoid a static.
+static ACE_High_Res_Timer *timer = 0;
+
+#if defined (ACE_HAS_THREADS)
+typedef ACE_Message_Queue<ACE_MT_SYNCH> SYNCH_QUEUE;
+
+struct Queue_Wrapper
+{
+ // = TITLE
+ // Container for data passed to sender and receiver in
+ // performance test.
+ //
+ // = DESCRIPTION
+ // For use in multithreaded performance test.
+
+ ACE_Message_Queue_Base *q_;
+ // The message queue.
+
+ ACE_Message_Block **send_block_;
+ // Pointer to messages blocks for sender to send to reciever.
+
+ Queue_Wrapper (void)
+ : q_ (0), send_block_ (0)
+ {
+ }
+ // Default constructor.
+};
+
+// For the message counting test, there are two tasks, producer and consumer.
+// Each will spawn a number of threads, and the two tasks share a queue.
+class Counting_Test_Producer : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Counting_Test_Producer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
+ : ACE_Task<ACE_MT_SYNCH> (0, queue), sequence_ (0), produced_ (0) {}
+ virtual int svc (void);
+
+ ACE_Atomic_Op<ACE_Thread_Mutex, long> sequence_;
+ ACE_Atomic_Op<ACE_Thread_Mutex, long> produced_;
+};
+
+class Counting_Test_Consumer : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ Counting_Test_Consumer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
+ : ACE_Task<ACE_MT_SYNCH> (0, queue), consumed_ (0) {}
+ virtual int svc (void);
+
+ ACE_Atomic_Op<ACE_Thread_Mutex, long> consumed_;
+};
+
+int
+Counting_Test_Producer::svc (void)
+{
+ // Going to produce a lot of blocks. Since we don't necessarily want them
+ // all consumed, there's no arrangement with the consumer to be sure that
+ // the same number produced will be consumed; the test check will compare
+ // the number produced, consumed, and remaining to be sure it ends up
+ // correct.
+ // Also, to be sure there's not just 1 producer and 1 consumer pinging
+ // back and forth, make the producers randomly delay between blocks.
+ ACE_OS::srand (static_cast<unsigned int> (ACE_OS::time ()));
+ int multiple = ACE_OS::rand () % 10;
+ int delay_ms = (ACE_OS::rand () % 10) / 2;
+ // The delay usually causes the test to time out in the automated
+ // regression testing. I just left it here in case it's needed someday.
+ delay_ms = 0;
+ long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
+ long produced = 0;
+ // Some of the threads enqueue single blocks, others sequences.
+ long lsequence = ++(this->sequence_);
+ int seq = static_cast<int> (lsequence);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Producer will enqueue %B blocks in seq of %d, ")
+ ACE_TEXT ("%d msec delay\n"),
+ (size_t)count,
+ seq,
+ delay_ms));
+
+ ACE_Message_Block *first = 0, *prev = 0, *b = 0;
+ ACE_Time_Value delay (0, delay_ms);
+ ACE_Time_Value timeout (10);
+ while (produced < count)
+ {
+ ACE_NEW_NORETURN (b, ACE_Message_Block (1));
+ if (b == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
+ break;
+ }
+ first = b;
+ prev = first;
+ for (int s = 1; s < seq; ++s)
+ {
+ ACE_NEW_NORETURN (b, ACE_Message_Block (1));
+ if (b == 0)
+ break;
+ prev->next (b);
+ b->prev (prev);
+ prev = b;
+ }
+ if (b == 0)
+ {
+ if (first != b)
+ {
+ while (first->next () != 0)
+ {
+ b = first->next ();
+ first->release ();
+ first = b;
+ }
+ first->release ();
+ }
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
+ break;
+ }
+ // To be sure we can keep going on slow or completed consumers, but not
+ // delay excessively if the consumers have stopped, limit the time
+ // spent waiting to 10 seconds.
+ ACE_Time_Value block = ACE_OS::gettimeofday ();
+ block += timeout;
+ if (this->putq (first, &block) == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Producer cannot putq; giving up\n")));
+ while (first->next () != 0)
+ {
+ b = first->next ();
+ first->release ();
+ first = b;
+ }
+ first->release ();
+ break;
+ }
+ produced += seq;
+ if (delay_ms)
+ ACE_OS::sleep (delay);
+ }
+ this->produced_ += produced;
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer done\n")));
+ return 0;
+}
+
+int
+Counting_Test_Consumer::svc (void)
+{
+ // Consume lots of blocks and release them. To mimic a thread with work
+ // to do, put a small random delay between dequeuing the blocks. Consume
+ // a calculated number of blocks then stop; the test checker will determine
+ // if the number consumed plus the number remaining is correct for the
+ // number produced.
+ unsigned int seed = static_cast<unsigned int> (ACE_OS::time ());
+
+ int multiple = ACE_OS::rand_r (&seed) % 10;
+ int delay_ms = ACE_OS::rand_r (&seed) % 10;
+ // The delay usually causes the test to time out in the automated
+ // regression testing. I just left it here in case it's needed someday.
+ delay_ms = 0;
+ long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
+ long consumed = 0;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Consumer will dequeue %B blocks, ")
+ ACE_TEXT ("%d msec delay\n"),
+ (size_t)count,
+ delay_ms));
+ ACE_Message_Block *b = 0;
+ ACE_Time_Value delay (0, delay_ms);
+ ACE_Time_Value timeout (2);
+ while (consumed < count)
+ {
+ // To be sure we can wait in the case of an empty queue, but not
+ // delay excessively if the producers have stopped, limit the time
+ // spent waiting to 2 seconds.
+ ACE_Time_Value block = ACE_OS::gettimeofday ();
+ block += timeout;
+ if (this->getq (b, &block) == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer timed out\n")));
+ else
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) Consumer %p\n"),
+ ACE_TEXT ("getq")));
+ break;
+ }
+ ++consumed;
+ b->release ();
+ if (delay_ms)
+ ACE_OS::sleep (delay);
+ }
+ this->consumed_ += consumed;
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer done\n")));
+ return 0;
+}
+
+static int
+counting_test (void)
+{
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Starting counting test\n")));
+
+ ACE_Message_Queue<ACE_MT_SYNCH> q (2 * 1024 * 1024); // 2MB high water
+ Counting_Test_Producer p (&q);
+ Counting_Test_Consumer c (&q);
+ // Activate consumers first; if the producers fail to start, consumers will
+ // stop quicker.
+ if (c.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Consumers %p\n"),
+ ACE_TEXT ("activate")),
+ -1);
+ if (p.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Producers %p\n"),
+ ACE_TEXT ("activate")));
+ c.wait ();
+ return -1;
+ }
+ // Producers and consumers are both running; wait for them then
+ // check the results.
+ p.wait ();
+ c.wait ();
+ // This compare relies on the flush() method counting blocks as it
+ // walks the chain releasing them, and doesn't rely on the count.
+ int status = 0;
+ long q_count = static_cast<long> (q.message_count ());
+ long remaining = q.flush ();
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Queue message_count is %b; %b flushed\n"),
+ (ssize_t)q_count,
+ (ssize_t)remaining));
+ if (q_count != remaining)
+ {
+ status = -1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("message_count and flushed should be equal!\n")));
+ }
+ long expected = p.produced_.value () - c.consumed_.value ();
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Produced %b, consumed %b; diff %b\n"),
+ (ssize_t)p.produced_.value (),
+ (ssize_t)c.consumed_.value (),
+ (ssize_t)expected));
+ if (expected != remaining)
+ {
+ status = -1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Producer-consumer diff is %b; should be %b\n"),
+ (ssize_t)expected,
+ (ssize_t)remaining));
+ }
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ending counting test\n")));
+ return status;
+}
+
+#endif /* ACE_HAS_THREADS */
+
+static int
+iterator_test (void)
+{
+ const int ITERATIONS = 5;
+ ACE_TCHAR buffer[ITERATIONS][BUFSIZ];
+ // Use queue size from of 32 Kb (more if using wide-char), instead of the
+ // default of 16 Kb (defined by ACE_Message_Queue_Base::DEFAULT_HWM),
+ // so that the test runs on machines with 8Kb pagesizes.
+
+ // QUEUE queue (32 * 1024 * sizeof (ACE_TCHAR));
+ QUEUE queue (sizeof(buffer));
+
+ int i;
+
+ for (i = 0; i < ITERATIONS; i++)
+ {
+ ACE_OS::sprintf (buffer[i],
+ ACE_TEXT ("%d"),
+ i + 1);
+
+ ACE_Message_Block *entry = 0;
+ ACE_NEW_RETURN (entry,
+ ACE_Message_Block ((char *) buffer[i],
+ sizeof buffer[i]),
+ -1);
+
+ if (queue.is_full ())
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("QUEUE:: the message queue is full on iteration %u!\n"),
+ i + 1),
+ -1);
+
+ if (queue.enqueue (entry) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("QUEUE::enqueue\n")),
+ -1);
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\nForward Iterations\n")));
+ {
+ ITERATOR iterator (queue);
+
+ for (ACE_Message_Block *entry = 0;
+ iterator.next (entry) != 0;
+ iterator.advance ())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s\n"),
+ entry->base ()));
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\nReverse Iterations\n")));
+ {
+ REVERSE_ITERATOR iterator (queue);
+
+ for (ACE_Message_Block *entry = 0;
+ iterator.next (entry) != 0;
+ iterator.advance ())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s\n"),
+ entry->base ()));
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\nForward Iterations\n")));
+ {
+ QUEUE::ITERATOR iterator (queue);
+
+ for (ACE_Message_Block *entry = 0;
+ iterator.next (entry) != 0;
+ iterator.advance ())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s\n"),
+ entry->base ()));
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\nReverse Iterations\n")));
+ {
+ QUEUE::REVERSE_ITERATOR iterator (queue);
+
+ for (ACE_Message_Block *entry = 0;
+ iterator.next (entry) != 0;
+ iterator.advance ())
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s\n"),
+ entry->base ()));
+ }
+
+ return 0;
+}
+
+#if defined (ACE_HAS_THREADS)
+
+static int
+chained_block_test (void)
+{
+
+ QUEUE q;
+ const char * s = "123456789"; // Will be length 10 when copied to block
+ const size_t slen = 10;
+ const size_t num_blks = 10;
+ ACE_Message_Block b[num_blks];
+ size_t i;
+ int status = 0;
+
+ for (i = 0; i < num_blks; ++i)
+ {
+ b[i].init (slen);
+ b[i].copy (s);
+ }
+
+ // Test enqueueing single and chained blocks and be sure they end up with
+ // the proper enqueued block count and sizes. Then be sure they are dequeued
+ // in the proper order.
+ b[0].next (&b[1]);
+ b[1].next (&b[2]);
+ // b[3] and b[4] are unchained.
+ b[5].next (&b[6]);
+ b[6].next (&b[7]);
+ b[7].next (&b[8]);
+ // b[9] is unchained
+ q.enqueue_tail (&b[3]);
+ q.enqueue_tail (&b[4]);
+ int num = q.enqueue_head (&b[0]);
+ if (num != 5)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 5; has %d\n"),
+ num));
+ status = -1;
+ }
+ num = q.enqueue_tail (&b[5]);
+ if (num != 9)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 9; has %d\n"),
+ num));
+ status = -1;
+ }
+ num = q.enqueue_tail (&b[9]);
+ if (num != 10)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 10; has %d\n"),
+ num));
+ status = -1;
+ }
+ size_t msgs, bytes;
+ msgs = q.message_count ();
+ bytes = q.message_bytes ();
+ if (msgs != 10 || bytes != 100)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Chained enqueue totals: %d msgs, %d bytes; ")
+ ACE_TEXT ("should be 10 msgs, 100 bytes\n"),
+ (int)msgs, (int)bytes));
+ status = -1;
+ }
+
+ // Now see if we can dequeue them, checking the order.
+ ACE_Time_Value nowait (ACE_OS::gettimeofday ());
+ ACE_Message_Block *bp;
+ int qstat;
+ for (i = 0; i < num_blks; ++i)
+ {
+ qstat = q.dequeue_head (bp, &nowait);
+ if (qstat == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Checking chained blocks, pass %d: %p\n"),
+ (int)i, ACE_TEXT ("dequeue_head")));
+ status = -1;
+ }
+ else if (bp != &b[i])
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Checking chained blocks, pass %d: ")
+ ACE_TEXT ("block out of order\n"),
+ (int)i));
+ status = -1;
+ }
+ }
+
+ if (status == 0)
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Chained block test OK\n")));
+ return status;
+}
+
+static int
+single_thread_performance_test (int queue_type = 0)
+{
+ const char test_message[] =
+ "ACE_Message_Queue Test Message";
+ const ACE_TCHAR *message =
+ ACE_TEXT ("ACE_Message_Queue<ACE_NULL_SYNCH>, single thread");
+ int i = 0;
+
+ // Create a message queue.
+ ACE_Message_Queue_Base *msgq = 0;
+
+ if (queue_type == 0)
+ ACE_NEW_RETURN (msgq,
+ QUEUE,
+ -1);
+#if defined (ACE_VXWORKS)
+ else
+ {
+ ACE_NEW_RETURN (msgq,
+ ACE_Message_Queue_Vx (max_messages,
+ MAX_MESSAGE_SIZE),
+ -1);
+ message = "ACE_Message_Queue_Vx, single thread test";
+ }
+#elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+ else
+ {
+ ACE_NEW_RETURN (msgq,
+ ACE_Message_Queue_NT,
+ -1);
+ message = ACE_TEXT ("ACE_Message_Queue_NT, single thread test");
+ }
+#endif /* ACE_VXWORKS */
+
+ // Create the messages. Allocate off the heap in case messages
+ // is large relative to the amount of stack space available.
+ ACE_Message_Block **send_block = 0;
+ ACE_NEW_RETURN (send_block,
+ ACE_Message_Block *[max_messages],
+ -1);
+
+ for (i = 0; i < max_messages; ++i)
+ ACE_NEW_RETURN (send_block[i],
+ ACE_Message_Block (test_message,
+ MAX_MESSAGE_SIZE),
+ -1);
+
+ ACE_Message_Block **receive_block_p = 0;
+ ACE_NEW_RETURN (receive_block_p,
+ ACE_Message_Block *[max_messages],
+ -1);
+
+#if defined (ACE_VXWORKS)
+ // Set up blocks to receive the messages. Allocate these off the
+ // heap in case messages is large relative to the amount of
+ // stack space available.
+ ACE_Message_Block *receive_block = 0;
+ ACE_NEW_RETURN (receive_block,
+ ACE_Message_Block[max_messages],
+ -1);
+
+ for (i = 0; i < max_messages; ++i)
+ {
+ receive_block[i].init (MAX_MESSAGE_SIZE);
+
+ // For VxWorks Message Queues, the receive block pointer must be
+ // assigned. It will be used by dequeue_head ().
+ receive_block_p[i] = &receive_block[i];
+ }
+#endif /* ACE_VXWORKS */
+
+ timer->start ();
+
+ // Send/receive the messages.
+ for (i = 0; i < max_messages; ++i)
+ {
+ if (msgq->enqueue_tail (send_block[i]) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("enqueue")),
+ -1);
+
+ if (msgq->dequeue_head (receive_block_p[i]) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("dequeue_head")),
+ -1);
+ }
+
+ timer->stop ();
+
+ ACE_Time_Value tv;
+ timer->elapsed_time (tv);
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
+ message,
+ max_messages,
+ tv.msec (),
+ (double) tv.msec () / max_messages));
+ timer->reset ();
+
+ delete [] receive_block_p;
+#if defined (ACE_VXWORKS)
+ delete [] receive_block;
+#endif /* ACE_VXWORKS */
+
+ for (i = 0; i < max_messages; ++i)
+ delete send_block[i];
+ delete [] send_block;
+ delete msgq;
+
+ return 0;
+}
+
+static void *
+receiver (void *arg)
+{
+ Queue_Wrapper *queue_wrapper =
+ reinterpret_cast<Queue_Wrapper *> (arg);
+ int i;
+
+ ACE_Message_Block **receive_block_p = 0;
+ ACE_NEW_RETURN (receive_block_p,
+ ACE_Message_Block *[max_messages],
+ (void *) -1);
+
+#if defined (ACE_VXWORKS)
+ // Set up blocks to receive the messages. Allocate these off the
+ // heap in case messages is large relative to the amount of stack
+ // space available.
+ ACE_Message_Block *receive_block;
+ ACE_NEW_RETURN (receive_block,
+ ACE_Message_Block[max_messages],
+ (void *) -1);
+
+ for (i = 0; i < max_messages; ++i)
+ {
+ receive_block[i].init (MAX_MESSAGE_SIZE);
+
+ // For VxWorks Message Queues, the receive block pointer must be
+ // assigned. It will be used by <dequeue_head>.
+ receive_block_p[i] = &receive_block[i];
+ }
+#endif /* ACE_VXWORKS */
+
+ for (i = 0; i < max_messages; ++i)
+ if (queue_wrapper->q_->dequeue_head (receive_block_p[i]) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("dequeue_head")),
+ 0);
+ timer->stop ();
+
+ delete [] receive_block_p;
+#if defined (ACE_VXWORKS)
+ delete [] receive_block;
+#endif /* ACE_VXWORKS */
+
+ return 0;
+}
+
+static void *
+sender (void *arg)
+{
+ Queue_Wrapper *queue_wrapper =
+ reinterpret_cast<Queue_Wrapper *> (arg);
+ int i;
+
+ timer->start ();
+
+ // Send the messages.
+ for (i = 0; i < max_messages; ++i)
+ if (queue_wrapper->q_->
+ enqueue_tail (queue_wrapper->send_block_[i]) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("enqueue")),
+ 0);
+ return 0;
+}
+
+static
+int
+performance_test (int queue_type = 0)
+{
+ Queue_Wrapper queue_wrapper;
+ const ACE_TCHAR *message =
+ ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH>");
+ int i = 0;
+
+ // Create the messages. Allocate off the heap in case messages is
+ // large relative to the amount of stack space available. Allocate
+ // it here instead of in the sender, so that we can delete it after
+ // the _receiver_ is done.
+ ACE_Message_Block **send_block = 0;
+ ACE_NEW_RETURN (send_block,
+ ACE_Message_Block *[max_messages],
+ -1);
+
+ for (i = 0; i < max_messages; ++i)
+ ACE_NEW_RETURN (send_block[i],
+ ACE_Message_Block (test_message,
+ MAX_MESSAGE_SIZE),
+ -1);
+
+ queue_wrapper.send_block_ = send_block;
+
+ if (queue_type == 0)
+ ACE_NEW_RETURN (queue_wrapper.q_,
+ SYNCH_QUEUE,
+ -1);
+#if defined (ACE_VXWORKS)
+ else
+ {
+ ACE_NEW_RETURN (queue_wrapper.q_,
+ ACE_Message_Queue_Vx (max_messages,
+ MAX_MESSAGE_SIZE),
+ -1);
+ message = "ACE_Message_Queue_Vx";
+ }
+#elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+ else
+ {
+ ACE_NEW_RETURN (queue_wrapper.q_,
+ ACE_Message_Queue_NT,
+ -1);
+ message = ACE_TEXT ("ACE_Message_Queue_NT");
+ }
+#endif /* ACE_VXWORKS */
+
+ if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) sender,
+ &queue_wrapper,
+ THR_BOUND) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("spawning sender thread")),
+ -1);
+
+ if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) receiver,
+ &queue_wrapper,
+ THR_BOUND) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("spawning receiver thread")),
+ -1);
+
+ ACE_Thread_Manager::instance ()->wait ();
+ ACE_Time_Value tv;
+ timer->elapsed_time (tv);
+ ACE_DEBUG ((LM_INFO, ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
+ message,
+ max_messages,
+ tv.msec (),
+ (double) tv.msec () / max_messages));
+ timer->reset ();
+
+ delete queue_wrapper.q_;
+ queue_wrapper.q_ = 0;
+
+ for (i = 0; i < max_messages; ++i)
+ delete send_block[i];
+ delete [] send_block;
+
+ return 0;
+}
+
+// Ensure that the timedout dequeue_head() sets errno code properly.
+
+static int
+timeout_test (void)
+{
+ SYNCH_QUEUE mq;
+ int status = 0;
+
+ if (!mq.is_empty ())
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("New queue is not empty!\n")));
+ status = 1;
+ }
+ else
+ {
+ ACE_Message_Block *b;
+ ACE_Time_Value tv (ACE_OS::gettimeofday ()); // Now
+
+ if (mq.dequeue_head (b, &tv) != -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Dequeued from empty queue!\n")));
+ status = 1;
+ }
+ else if (errno != EWOULDBLOCK)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
+ status = 1;
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Timed dequeue test: OK\n")));
+ status = 0; // All is well
+ }
+ }
+
+ return status;
+}
+#endif /* ACE_HAS_THREADS */
+
+// Check to make sure that dequeue_prio() respects FIFO ordering.
+// @@ At some point, this function should be enhanced to do a more
+// thorough check...
+
+static int
+prio_test (void)
+{
+ const char S1[] = "first";
+ const char S2[] = "second";
+ const int PRIORITY = 50;
+ QUEUE mq;
+ int status;
+
+ ACE_Message_Block mb1 (S1, sizeof S1, PRIORITY);
+ ACE_Message_Block mb2 (S2, sizeof S2, PRIORITY);
+
+ mq.enqueue_prio (&mb1);
+ mq.enqueue_prio (&mb2);
+
+ ACE_Message_Block *mb1p;
+ ACE_Message_Block *mb2p;
+
+ mq.dequeue_prio (mb1p);
+ mq.dequeue_prio (mb2p);
+
+ ACE_DEBUG ((LM_DEBUG, "message 1 = %C\nmessage 2 = %C\n",
+ mb1p->rd_ptr (),
+ mb2p->rd_ptr ()));
+
+ if (ACE_OS_String::strcmp (mb1p->rd_ptr (), S1) == 0
+ && ACE_OS_String::strcmp (mb2p->rd_ptr (), S2) == 0)
+ status = 0;
+ else
+ status = 1;
+
+ return status;
+}
+
+static int
+close_test (void)
+{
+ int status = 0;
+
+ int flushed_messages;
+
+ QUEUE mq1;
+ flushed_messages = mq1.close ();
+
+ if (flushed_messages != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Closing queue should flush 0 messages, close() reports - %d\n"),
+ flushed_messages ));
+ status = 1;
+ return status;
+ }
+
+ // There was a bug that return previous queue state instead of
+ // number of flushed messages. Thus, insert 2 messages != ACTIVATE
+ // queue state
+ ACE_Message_Block *pMB1;
+ ACE_Message_Block *pMB2;
+ ACE_NEW_NORETURN (pMB1, ACE_Message_Block (1));
+ ACE_NEW_NORETURN (pMB2, ACE_Message_Block (1));
+ QUEUE mq2;
+ mq2.enqueue_head (pMB1);
+ mq2.enqueue_head (pMB2);
+ flushed_messages = mq2.close ();
+
+ if (flushed_messages != 2)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Closing queue should flush 2 messages, close() reports - %d\n"),
+ flushed_messages ));
+ status = 1;
+ return status;
+ }
+ return status;
+}
+
+int
+run_main (int argc, ACE_TCHAR *argv[])
+{
+ ACE_START_TEST (ACE_TEXT ("Message_Queue_Test"));
+
+ if (argc == 2)
+ {
+ if (!ACE_OS::strcmp (argv[1], ACE_TEXT ("-?")))
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%s/n"),
+ usage));
+ }
+ else
+ {
+ max_messages = ACE_OS::atoi (argv[1]);
+ }
+ }
+
+ int status = prio_test ();
+
+ // The iterator test occasionally causes a page fault or a hang on
+ // VxWorks.
+ if (status == 0)
+ status = iterator_test ();
+
+ ACE_NEW_RETURN (timer,
+ ACE_High_Res_Timer,
+ -1);
+
+ if (status == 0)
+ status = close_test ();
+
+#if defined (ACE_HAS_THREADS)
+ if (status == 0)
+ status = timeout_test ();
+
+ if (status == 0)
+ status = chained_block_test ();
+
+ if (status == 0)
+ status = single_thread_performance_test ();
+
+# if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+ // Test ACE_Message_Queue_Vx. or ACE_Message_Queue_NT
+ if (status == 0)
+ status = single_thread_performance_test (1);
+# endif /* ACE_VXWORKS */
+
+ if (status == 0)
+ status = performance_test ();
+
+# if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+ // Test ACE_Message_Queue_Vx or ACE_Message_Queue_NT
+ if (status == 0)
+ status = performance_test (1);
+# endif /* ACE_VXWORKS */
+
+ if (counting_test () != 0)
+ status = -1;
+#endif /* ACE_HAS_THREADS */
+
+ if (status != 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("test failed")));
+ delete timer;
+ timer = 0;
+
+
+
+ ACE_END_TEST;
+ return status;
+}
+