summaryrefslogtreecommitdiff
path: root/ACE/examples/ASX/Message_Queue/priority_buffer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/ASX/Message_Queue/priority_buffer.cpp')
-rw-r--r--ACE/examples/ASX/Message_Queue/priority_buffer.cpp145
1 files changed, 145 insertions, 0 deletions
diff --git a/ACE/examples/ASX/Message_Queue/priority_buffer.cpp b/ACE/examples/ASX/Message_Queue/priority_buffer.cpp
new file mode 100644
index 00000000000..db60a33bcae
--- /dev/null
+++ b/ACE/examples/ASX/Message_Queue/priority_buffer.cpp
@@ -0,0 +1,145 @@
+// $Id$
+
+// This short program prints the contents of stdin to stdout sorted by
+// the length of each line via the use of an ASX Message_Queue. It
+// illustrates how priorities can be used for ACE Message_Queues.
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/Malloc_Base.h" // To get ACE_Allocator
+#include "ace/Message_Queue.h"
+#include "ace/Read_Buffer.h"
+#include "ace/Thread_Manager.h"
+#include "ace/Service_Config.h"
+
+ACE_RCSID(Message_Queue, priority_buffer, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+// Global thread manager.
+static ACE_Thread_Manager thr_mgr;
+
+// Make the queue be capable of being *very* large.
+static const long max_queue = LONG_MAX;
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// producer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+static void *
+consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Keep looping, reading a message out of the queue, until we
+ // timeout or get a message with a length == 0, which signals us to
+ // quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb;
+
+ if (msg_queue->dequeue_head (mb) == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::puts (mb->rd_ptr ());
+
+ // Free up the buffer memory and the Message_Block.
+ ACE_Allocator::instance ()->free (mb->rd_ptr ());
+ mb->release ();
+
+ if (length == 0)
+ break;
+ }
+
+ return 0;
+}
+
+// The producer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+static void *
+producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ ACE_Read_Buffer rb (ACE_STDIN);
+
+ // Keep reading stdin, until we reach EOF.
+
+ for (;;)
+ {
+ // Allocate a new buffer.
+ char *buffer = rb.read ('\n');
+
+ ACE_Message_Block *mb;
+
+ if (buffer == 0)
+ {
+ // Send a 0-sized shutdown message to the other thread and
+ // exit.
+
+ ACE_NEW_RETURN (mb, ACE_Message_Block ((size_t) 0), 0);
+
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ break;
+ }
+
+ // Enqueue the message in priority order.
+ else
+ {
+ // Allocate a new message, but have it "borrow" its memory
+ // from the buffer.
+ ACE_NEW_RETURN (mb, ACE_Message_Block (rb.size (),
+ ACE_Message_Block::MB_DATA,
+ 0,
+ buffer),
+ 0);
+ mb->msg_priority (rb.size ());
+ mb->wr_ptr (rb.size ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ "enqueueing message of size %d\n",
+ mb->msg_priority ()));
+
+ // Enqueue in priority order.
+ if (msg_queue->enqueue_prio (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
+ }
+ }
+
+ // Now read all the items out in priority order (i.e., ordered by
+ // the size of the lines!).
+ consumer (msg_queue);
+
+ return 0;
+}
+
+// Spawn off one thread that copies stdin to stdout in order of the
+// size of each line.
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ // Message queue.
+ ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue);
+
+ if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
+
+ // Wait for producer and consumer threads to exit.
+ thr_mgr.wait ();
+ return 0;
+}
+#else
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */