summaryrefslogtreecommitdiff
path: root/examples/ASX/Message_Queue
diff options
context:
space:
mode:
Diffstat (limited to 'examples/ASX/Message_Queue')
-rw-r--r--examples/ASX/Message_Queue/ASX_Message_Queue.mpc24
-rw-r--r--examples/ASX/Message_Queue/bounded_buffer.cpp140
-rw-r--r--examples/ASX/Message_Queue/buffer_stream.cpp311
-rw-r--r--examples/ASX/Message_Queue/priority_buffer.cpp145
4 files changed, 0 insertions, 620 deletions
diff --git a/examples/ASX/Message_Queue/ASX_Message_Queue.mpc b/examples/ASX/Message_Queue/ASX_Message_Queue.mpc
deleted file mode 100644
index 8488a7dc89f..00000000000
--- a/examples/ASX/Message_Queue/ASX_Message_Queue.mpc
+++ /dev/null
@@ -1,24 +0,0 @@
-// -*- MPC -*-
-// $Id$
-
-project(*Bounded_Buffer) : aceexe {
- exename = bounded_buffer
- Source_Files {
- bounded_buffer.cpp
- }
-}
-
-project(*Buffer_Stream) : aceexe {
- exename = buffer_stream
- Source_Files {
- buffer_stream.cpp
- }
-}
-
-project(*Priority_Buffer) : aceexe {
- exename = priority_buffer
- Source_Files {
- priority_buffer.cpp
- }
-}
-
diff --git a/examples/ASX/Message_Queue/bounded_buffer.cpp b/examples/ASX/Message_Queue/bounded_buffer.cpp
deleted file mode 100644
index ada4c6304d1..00000000000
--- a/examples/ASX/Message_Queue/bounded_buffer.cpp
+++ /dev/null
@@ -1,140 +0,0 @@
-// $Id$
-
-// This short program copies stdin to stdout via the use of an ASX
-// Message_Queue. It illustrates an implementation of the classic
-// "bounded buffer" program.
-
-#include "ace/Message_Queue.h"
-#include "ace/Thread_Manager.h"
-#include "ace/OS_NS_time.h"
-#include "ace/OS_NS_unistd.h"
-
-ACE_RCSID(Message_Queue, bounded_buffer, "$Id$")
-
-#if defined (ACE_HAS_THREADS)
-
-// The producer reads data from the stdin stream, creates a message,
-// and then queues the message in the message list, where it is
-// removed by the consumer thread. A 0-sized message is enqueued when
-// there is no more data to read. The consumer uses this as a flag to
-// know when to exit.
-
-static void *
-producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
-{
- // Keep reading stdin, until we reach EOF.
-
- for (int n; ; )
- {
- // Allocate a new message.
- ACE_Message_Block *mb;
-
- ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0);
-
- n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ());
-
- if (n <= 0)
- {
- // Send a shutdown message to the other thread and exit.
- mb->length (0);
- if (msg_queue->enqueue_tail (mb) == -1)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "put_next"));
- break;
- }
-
- // Send the message to the other thread.
- else
- {
- mb->msg_priority (n);
- mb->wr_ptr (n);
- if (msg_queue->enqueue_tail (mb) == -1)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "put_next"));
- }
- }
-
- return 0;
-}
-
-// The consumer dequeues a message from the ACE_Message_Queue, writes
-// the message to the stderr stream, and deletes the message. The
-// producer sends a 0-sized message to inform the consumer to stop
-// reading and exit.
-
-static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
-{
- int result = 0;
-
- // Keep looping, reading a message out of the queue, until we timeout
- // or get a message with a length == 0, which signals us to quit.
-
- for (;;)
- {
- ACE_Message_Block *mb;
-
- ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
-
- result = msg_queue->dequeue_head (mb, &timeout);
-
- if (result == -1)
- break;
-
- int length = mb->length ();
-
- if (length > 0)
- ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
-
- mb->release ();
-
- if (length == 0)
- break;
- }
-
- if (result == -1 && errno == EWOULDBLOCK)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n%a",
- "timed out waiting for message",
- 1));
- return 0;
-}
-
-// Spawn off two threads that copy stdin to stdout.
-
-int
-main (int, char *[])
-{
- // Message list.
- ACE_Message_Queue<ACE_MT_SYNCH> msg_queue;
-
- if (ACE_Thread_Manager::instance ()->spawn
- (ACE_THR_FUNC (producer),
- (void *) &msg_queue,
- THR_NEW_LWP | THR_DETACHED) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "spawn"),
- 1);
- else if (ACE_Thread_Manager::instance ()->spawn
- (ACE_THR_FUNC (consumer),
- (void *) &msg_queue,
- THR_NEW_LWP | THR_DETACHED) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "spawn"),
- 1);
-
- // Wait for producer and consumer threads to exit.
- ACE_Thread_Manager::instance ()->wait ();
- return 0;
-}
-#else
-int
-main (int, char *[])
-{
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
- return 0;
-}
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp
deleted file mode 100644
index b17d4a85d19..00000000000
--- a/examples/ASX/Message_Queue/buffer_stream.cpp
+++ /dev/null
@@ -1,311 +0,0 @@
-// $Id$
-
-// This short program copies stdin to stdout via the use of an ASX
-// Stream. It illustrates an implementation of the classic "bounded
-// buffer" program using an ASX Stream containing two Modules. Each
-// ACE_Module contains two Tasks. Each ACE_Task contains a
-// ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how
-// the use of these reusable components reduces the reliance on global
-// variables, as compared with the bounded_buffer.C example.
-
-#include "ace/OS_NS_stdio.h"
-#include "ace/OS_NS_string.h"
-#include "ace/OS_NS_time.h"
-#include "ace/OS_NS_unistd.h"
-#include "ace/Service_Config.h"
-#include "ace/Stream.h"
-#include "ace/Module.h"
-#include "ace/Task.h"
-
-ACE_RCSID(Message_Queue, buffer_stream, "$Id$")
-
-#if defined (ACE_HAS_THREADS)
-
-typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
-typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
-typedef ACE_Task<ACE_MT_SYNCH> MT_Task;
-
-class Common_Task : public MT_Task
- // = TITLE
- // Methods that are common to the producer and consumer.
-{
-public:
- Common_Task (void) {}
- // ACE_Task hooks
- virtual int open (void * = 0);
- virtual int close (u_long = 0);
-};
-
-// Define the Producer interface.
-
-class Producer : public Common_Task
-{
-public:
- Producer (void) {}
-
- // Read data from stdin and pass to consumer.
- virtual int svc (void);
-};
-
-class Consumer : public Common_Task
- // = TITLE
- // Define the Consumer interface.
-{
-public:
- Consumer (void) {}
-
- virtual int put (ACE_Message_Block *mb,
- ACE_Time_Value *tv = 0);
- // Enqueue the message on the ACE_Message_Queue for subsequent
- // handling in the svc() method.
-
- virtual int svc (void);
- // Receive message from producer and print to stdout.
-
-private:
-
- ACE_Time_Value timeout_;
-};
-
-class Filter : public MT_Task
- // = TITLE
- // Defines a Filter that prepends a line number in front of each
- // line.
-{
-public:
- Filter (void): count_ (1) {}
-
- virtual int put (ACE_Message_Block *mb,
- ACE_Time_Value *tv = 0);
- // Change the size of the message before passing it downstream.
-
-private:
- size_t count_;
- // Count the number of lines passing through the filter.
-};
-
-// Spawn off a new thread.
-
-int
-Common_Task::open (void *)
-{
- if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "spawn"),
- -1);
- return 0;
-}
-
-int
-Common_Task::close (u_long exit_status)
-{
- ACE_DEBUG ((LM_DEBUG,
- "(%t) thread is exiting with status %d in module %s\n",
- exit_status,
- this->name ()));
-
- // Can do anything here that is required when a thread exits, e.g.,
- // storing thread-specific information in some other storage
- // location, etc.
- return 0;
-}
-
-// The Consumer reads data from the stdin stream, creates a message,
-// and then queues the message in the message list, where it is
-// removed by the consumer thread. A 0-sized message is enqueued when
-// there is no more data to read. The consumer uses this as a flag to
-// know when to exit.
-
-int
-Producer::svc (void)
-{
- // Keep reading stdin, until we reach EOF.
-
- for (int n; ; )
- {
- // Allocate a new message (add one to avoid nasty boundary
- // conditions).
-
- ACE_Message_Block *mb;
-
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (BUFSIZ + 1),
- -1);
-
- n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ);
-
- if (n <= 0)
- {
- // Send a shutdown message to the other thread and exit.
- mb->length (0);
-
- if (this->put_next (mb) == -1)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "put_next"));
- break;
- }
-
- // Send the message to the other thread.
- else
- {
- mb->wr_ptr (n);
- // NUL-terminate the string (since we use strlen() on it
- // later).
- mb->rd_ptr ()[n] = '\0';
-
- if (this->put_next (mb) == -1)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "put_next"));
- }
- }
-
- return 0;
-}
-
-// Simply enqueue the Message_Block into the end of the queue.
-
-int
-Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
-{
- return this->putq (mb, tv);
-}
-
-// The consumer dequeues a message from the ACE_Message_Queue, writes
-// the message to the stderr stream, and deletes the message. The
-// Consumer sends a 0-sized message to inform the consumer to stop
-// reading and exit.
-
-int
-Consumer::svc (void)
-{
- int result = 0;
-
- // Keep looping, reading a message out of the queue, until we
- // timeout or get a message with a length == 0, which signals us to
- // quit.
-
- for (;;)
- {
- ACE_Message_Block *mb;
-
- // Wait for upto 4 seconds.
- this->timeout_.sec (ACE_OS::time (0) + 4);
-
- result = this->getq (mb, &this->timeout_);
-
- if (result == -1)
- break;
-
- int length = mb->length ();
-
- if (length > 0)
- ACE_OS::write (ACE_STDOUT,
- mb->rd_ptr (),
- ACE_OS::strlen (mb->rd_ptr ()));
-
- mb->release ();
-
- if (length == 0)
- break;
- }
-
- if (result == -1 && errno == EWOULDBLOCK)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n%a",
- "timed out waiting for message",
- 1));
- return 0;
-}
-
-// The filter prepends a line number in front of each line.
-
-int
-Filter::put (ACE_Message_Block *mb,
- ACE_Time_Value *tv)
-{
- if (mb->length () == 0)
- return this->put_next (mb, tv);
- else
- {
- char buf[BUFSIZ];
-
- // Stash a copy of the buffer away.
- ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf);
-
- // Increase the size of the buffer large enough that it will be
- // reallocated (in order to test the reallocation mechanisms).
-
- mb->size (mb->length () + BUFSIZ);
- mb->length (mb->size ());
-
- // Prepend the line count in front of the buffer.
- ACE_OS::sprintf (mb->rd_ptr (),
- ACE_SIZE_T_FORMAT_SPECIFIER": %s",
- this->count_++,
- buf);
- return this->put_next (mb, tv);
- }
-}
-
-// Main driver function.
-
-int
-main (int, char *argv[])
-{
- ACE_Service_Config daemon (argv[0]);
-
- // This Stream controls hierachically-related active objects.
- MT_Stream stream;
-
- MT_Module *pm;
- MT_Module *fm;
- MT_Module *cm;
-
- ACE_NEW_RETURN (cm,
- MT_Module ("Consumer",
- new Consumer),
- -1);
- ACE_NEW_RETURN (fm,
- MT_Module ("Filter",
- new Filter),
- -1);
- ACE_NEW_RETURN (pm,
- MT_Module ("Producer",
- new Producer),
- -1);
-
- // Create Consumer, Filter, and Producer Modules and push them onto
- // the Stream. All processing is performed in the Stream.
-
- if (stream.push (cm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "push"),
- 1);
- else if (stream.push (fm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "push"),
- 1);
- else if (stream.push (pm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "push"),
- 1);
- // Barrier synchronization: wait for the threads to exit, then exit
- // ourselves.
- ACE_Thread_Manager::instance ()->wait ();
- return 0;
-}
-#else
-int
-main (int, char *[])
-{
- ACE_ERROR ((LM_ERROR,
- "threads not supported on this platform\n"));
- return 0;
-}
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Message_Queue/priority_buffer.cpp b/examples/ASX/Message_Queue/priority_buffer.cpp
deleted file mode 100644
index 26603c7dc64..00000000000
--- a/examples/ASX/Message_Queue/priority_buffer.cpp
+++ /dev/null
@@ -1,145 +0,0 @@
-// $Id$
-
-// This short program prints the contents of stdin to stdout sorted by
-// the length of each line via the use of an ASX Message_Queue. It
-// illustrates how priorities can be used for ACE Message_Queues.
-
-#include "ace/OS_NS_stdio.h"
-#include "ace/Malloc_Base.h" // To get ACE_Allocator
-#include "ace/Message_Queue.h"
-#include "ace/Read_Buffer.h"
-#include "ace/Thread_Manager.h"
-#include "ace/Service_Config.h"
-
-ACE_RCSID(Message_Queue, priority_buffer, "$Id$")
-
-#if defined (ACE_HAS_THREADS)
-
-// Global thread manager.
-static ACE_Thread_Manager thr_mgr;
-
-// Make the queue be capable of being *very* large.
-static const long max_queue = LONG_MAX;
-
-// The consumer dequeues a message from the ACE_Message_Queue, writes
-// the message to the stderr stream, and deletes the message. The
-// producer sends a 0-sized message to inform the consumer to stop
-// reading and exit.
-
-static void *
-consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
-{
- // Keep looping, reading a message out of the queue, until we
- // timeout or get a message with a length == 0, which signals us to
- // quit.
-
- for (;;)
- {
- ACE_Message_Block *mb;
-
- if (msg_queue->dequeue_head (mb) == -1)
- break;
-
- int length = mb->length ();
-
- if (length > 0)
- ACE_OS::puts (mb->rd_ptr ());
-
- // Free up the buffer memory and the Message_Block.
- ACE_Allocator::instance ()->free (mb->rd_ptr ());
- mb->release ();
-
- if (length == 0)
- break;
- }
-
- return 0;
-}
-
-// The producer reads data from the stdin stream, creates a message,
-// and then queues the message in the message list, where it is
-// removed by the consumer thread. A 0-sized message is enqueued when
-// there is no more data to read. The consumer uses this as a flag to
-// know when to exit.
-
-static void *
-producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
-{
- ACE_Read_Buffer rb (ACE_STDIN);
-
- // Keep reading stdin, until we reach EOF.
-
- for (;;)
- {
- // Allocate a new buffer.
- char *buffer = rb.read ('\n');
-
- ACE_Message_Block *mb;
-
- if (buffer == 0)
- {
- // Send a 0-sized shutdown message to the other thread and
- // exit.
-
- ACE_NEW_RETURN (mb, ACE_Message_Block ((size_t) 0), 0);
-
- if (msg_queue->enqueue_tail (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
- break;
- }
-
- // Enqueue the message in priority order.
- else
- {
- // Allocate a new message, but have it "borrow" its memory
- // from the buffer.
- ACE_NEW_RETURN (mb, ACE_Message_Block (rb.size (),
- ACE_Message_Block::MB_DATA,
- 0,
- buffer),
- 0);
- mb->msg_priority (rb.size ());
- mb->wr_ptr (rb.size ());
-
- ACE_DEBUG ((LM_DEBUG,
- "enqueueing message of size %d\n",
- mb->msg_priority ()));
-
- // Enqueue in priority order.
- if (msg_queue->enqueue_prio (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
- }
- }
-
- // Now read all the items out in priority order (i.e., ordered by
- // the size of the lines!).
- consumer (msg_queue);
-
- return 0;
-}
-
-// Spawn off one thread that copies stdin to stdout in order of the
-// size of each line.
-
-int
-main (int, char *[])
-{
- // Message queue.
- ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue);
-
- if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue,
- THR_NEW_LWP | THR_DETACHED) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
-
- // Wait for producer and consumer threads to exit.
- thr_mgr.wait ();
- return 0;
-}
-#else
-int
-main (int, char *[])
-{
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
- return 0;
-}
-#endif /* ACE_HAS_THREADS */