summaryrefslogtreecommitdiff
path: root/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/ASX/Message_Queue/bounded_buffer.cpp')
-rw-r--r--ACE/examples/ASX/Message_Queue/bounded_buffer.cpp140
1 files changed, 140 insertions, 0 deletions
diff --git a/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp b/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp
new file mode 100644
index 00000000000..2cdc50f2116
--- /dev/null
+++ b/ACE/examples/ASX/Message_Queue/bounded_buffer.cpp
@@ -0,0 +1,140 @@
+// $Id$
+
+// This short program copies stdin to stdout via the use of an ASX
+// Message_Queue. It illustrates an implementation of the classic
+// "bounded buffer" program.
+
+#include "ace/Message_Queue.h"
+#include "ace/Thread_Manager.h"
+#include "ace/OS_NS_time.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID(Message_Queue, bounded_buffer, "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+// The producer reads data from the stdin stream, creates a message,
+// and then queues the message in the message list, where it is
+// removed by the consumer thread. A 0-sized message is enqueued when
+// there is no more data to read. The consumer uses this as a flag to
+// know when to exit.
+
+static void *
+producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ // Keep reading stdin, until we reach EOF.
+
+ for (int n; ; )
+ {
+ // Allocate a new message.
+ ACE_Message_Block *mb;
+
+ ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0);
+
+ n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ());
+
+ if (n <= 0)
+ {
+ // Send a shutdown message to the other thread and exit.
+ mb->length (0);
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put_next"));
+ break;
+ }
+
+ // Send the message to the other thread.
+ else
+ {
+ mb->msg_priority (n);
+ mb->wr_ptr (n);
+ if (msg_queue->enqueue_tail (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put_next"));
+ }
+ }
+
+ return 0;
+}
+
+// The consumer dequeues a message from the ACE_Message_Queue, writes
+// the message to the stderr stream, and deletes the message. The
+// producer sends a 0-sized message to inform the consumer to stop
+// reading and exit.
+
+static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
+{
+ int result = 0;
+
+ // Keep looping, reading a message out of the queue, until we timeout
+ // or get a message with a length == 0, which signals us to quit.
+
+ for (;;)
+ {
+ ACE_Message_Block *mb;
+
+ ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
+
+ result = msg_queue->dequeue_head (mb, &timeout);
+
+ if (result == -1)
+ break;
+
+ int length = mb->length ();
+
+ if (length > 0)
+ ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
+
+ mb->release ();
+
+ if (length == 0)
+ break;
+ }
+
+ if (result == -1 && errno == EWOULDBLOCK)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n%a",
+ "timed out waiting for message",
+ 1));
+ return 0;
+}
+
+// Spawn off two threads that copy stdin to stdout.
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ // Message list.
+ ACE_Message_Queue<ACE_MT_SYNCH> msg_queue;
+
+ if (ACE_Thread_Manager::instance ()->spawn
+ (ACE_THR_FUNC (producer),
+ (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "spawn"),
+ 1);
+ else if (ACE_Thread_Manager::instance ()->spawn
+ (ACE_THR_FUNC (consumer),
+ (void *) &msg_queue,
+ THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "spawn"),
+ 1);
+
+ // Wait for producer and consumer threads to exit.
+ ACE_Thread_Manager::instance ()->wait ();
+ return 0;
+}
+#else
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_THREADS */