diff options
author | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-10-21 21:41:34 +0000 |
---|---|---|
committer | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-10-21 21:41:34 +0000 |
commit | a5fdebc5f6375078ec1763850a4ca23ec7fe6458 (patch) | |
tree | bcf0a25c3d45a209a6e3ac37b233a4812f29c732 /tests/Priority_Buffer_Test.cpp | |
download | ATCD-a5fdebc5f6375078ec1763850a4ca23ec7fe6458.tar.gz |
Initial revision
Diffstat (limited to 'tests/Priority_Buffer_Test.cpp')
-rw-r--r-- | tests/Priority_Buffer_Test.cpp | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/tests/Priority_Buffer_Test.cpp b/tests/Priority_Buffer_Test.cpp new file mode 100644 index 00000000000..1b117a4de9a --- /dev/null +++ b/tests/Priority_Buffer_Test.cpp @@ -0,0 +1,158 @@ +// ============================================================================ +// @(#)Priority_Buffer_Test.cpp 1.1 10/18/96 + +// +// = LIBRARY +// tests +// +// = FILENAME +// Priority_Buffer_Test.cpp +// +// = DESCRIPTION +// This is a simple test to illustrate the priority mechanism of +// ACE Message_Queues. The producer uses an ASX Message_Queue to +// enqueue a bunch of messages with different priorities which +// are then dequeued by the consumer. +// +// = AUTHOR +// Prashant Jain and Doug Schmidt +// +// ============================================================================ + +#include "ace/Log_Msg.h" +#include "ace/Message_Queue.h" +#include "ace/Thread_Manager.h" +#include "ace/Service_Config.h" +#include "test_config.h" + +#if defined (ACE_HAS_THREADS) + +// Global thread manager. +static ACE_Thread_Manager thr_mgr; +static int count = 0; + +// Make the queue be capable of being *very* large. +static const long max_queue = LONG_MAX; + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// producer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +static void * +consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + int cur_priority = 27; + int local_count = 0; + + // Keep looping, reading a message out of the queue, until we + // get a message with a length == 0, which signals us to quit. + for (char c = 'z'; ; c--) + { + ACE_Message_Block *mb = 0; + + int result = msg_queue->dequeue_head (mb); + + if (result == -1) + break; + + local_count++; + + int length = mb->length (); + ACE_ASSERT (mb->msg_priority () < cur_priority); + cur_priority = mb->msg_priority (); + + if (length > 0) + ACE_ASSERT (c == *mb->rd_ptr ()); + + // Free up the buffer memory and the Message_Block. Note that + // the destructor of Message Block will delete the the actual + // buffer. + delete mb; + + if (length == 0) + break; + } + ACE_ASSERT (local_count == count); + return 0; +} + +// The producer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +static void * +producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + // Insert thread into thr_mgr. + ACE_Thread_Control thread_control (&thr_mgr); + ACE_NEW_THREAD; + + ACE_Message_Block *mb = 0; + + for (char c = 'a'; c <= 'z'; c++) + { + count++; + + // Allocate a new message + + ACE_NEW_RETURN (mb, ACE_Message_Block (1), 0); + *mb->rd_ptr () = c; + + // Set the priority. + mb->msg_priority (count); + mb->wr_ptr (1); + + // Enqueue in priority order. + if (msg_queue->enqueue (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "put_next"), 0); + } + + // Now send a 0-sized shutdown message to the other thread + ACE_NEW_RETURN (mb, ACE_Message_Block ((size_t) 0), 0); + + if (msg_queue->enqueue_tail (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + + count++; + + // Now read all the items out in priority order (i.e., ordered by + // the size of the lines!). + consumer (msg_queue); + + // The destructor of ACE_Thread_Control removes the exiting thread + // from the thr_mgr automatically. + return 0; +} + +// Spawn off one thread that copies stdin to stdout in order of the +// size of each line. + +int +main (int argc, char *argv[]) +{ + ACE_START_TEST; + + // Message queue. + ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue); + + if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); + + // Wait for producer and consumer threads to exit. + thr_mgr.wait (); + + ACE_END_TEST; + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ |