diff options
Diffstat (limited to 'ACE/examples/ASX/Message_Queue/buffer_stream.cpp')
-rw-r--r-- | ACE/examples/ASX/Message_Queue/buffer_stream.cpp | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/ACE/examples/ASX/Message_Queue/buffer_stream.cpp b/ACE/examples/ASX/Message_Queue/buffer_stream.cpp new file mode 100644 index 00000000000..b1f918cfef5 --- /dev/null +++ b/ACE/examples/ASX/Message_Queue/buffer_stream.cpp @@ -0,0 +1,311 @@ +// $Id$ + +// This short program copies stdin to stdout via the use of an ASX +// Stream. It illustrates an implementation of the classic "bounded +// buffer" program using an ASX Stream containing two Modules. Each +// ACE_Module contains two Tasks. Each ACE_Task contains a +// ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how +// the use of these reusable components reduces the reliance on global +// variables, as compared with the bounded_buffer.C example. + +#include "ace/OS_main.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_time.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Service_Config.h" +#include "ace/Stream.h" +#include "ace/Module.h" +#include "ace/Task.h" + +ACE_RCSID(Message_Queue, buffer_stream, "$Id$") + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; +typedef ACE_Module<ACE_MT_SYNCH> MT_Module; +typedef ACE_Task<ACE_MT_SYNCH> MT_Task; + +class Common_Task : public MT_Task + // = TITLE + // Methods that are common to the producer and consumer. +{ +public: + Common_Task (void) {} + // ACE_Task hooks + virtual int open (void * = 0); + virtual int close (u_long = 0); +}; + +// Define the Producer interface. + +class Producer : public Common_Task +{ +public: + Producer (void) {} + + // Read data from stdin and pass to consumer. + virtual int svc (void); +}; + +class Consumer : public Common_Task + // = TITLE + // Define the Consumer interface. +{ +public: + Consumer (void) {} + + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); + // Enqueue the message on the ACE_Message_Queue for subsequent + // handling in the svc() method. + + virtual int svc (void); + // Receive message from producer and print to stdout. + +private: + + ACE_Time_Value timeout_; +}; + +class Filter : public MT_Task + // = TITLE + // Defines a Filter that prepends a line number in front of each + // line. +{ +public: + Filter (void): count_ (1) {} + + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value *tv = 0); + // Change the size of the message before passing it downstream. + +private: + size_t count_; + // Count the number of lines passing through the filter. +}; + +// Spawn off a new thread. + +int +Common_Task::open (void *) +{ + if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("spawn")), + -1); + return 0; +} + +int +Common_Task::close (u_long exit_status) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"), + exit_status, + this->name ())); + + // Can do anything here that is required when a thread exits, e.g., + // storing thread-specific information in some other storage + // location, etc. + return 0; +} + +// The Consumer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +int +Producer::svc (void) +{ + // Keep reading stdin, until we reach EOF. + + for (int n; ; ) + { + // Allocate a new message (add one to avoid nasty boundary + // conditions). + + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ); + + if (n <= 0) + { + // Send a shutdown message to the other thread and exit. + mb->length (0); + + if (this->put_next (mb) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("put_next"))); + break; + } + + // Send the message to the other thread. + else + { + mb->wr_ptr (n); + // NUL-terminate the string (since we use strlen() on it + // later). + mb->rd_ptr ()[n] = '\0'; + + if (this->put_next (mb) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("put_next"))); + } + } + + return 0; +} + +// Simply enqueue the Message_Block into the end of the queue. + +int +Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +{ + return this->putq (mb, tv); +} + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// Consumer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +int +Consumer::svc (void) +{ + int result = 0; + + // Keep looping, reading a message out of the queue, until we + // timeout or get a message with a length == 0, which signals us to + // quit. + + for (;;) + { + ACE_Message_Block *mb = 0; + + // Wait for upto 4 seconds. + this->timeout_.sec (ACE_OS::time (0) + 4); + + result = this->getq (mb, &this->timeout_); + + if (result == -1) + break; + + int length = mb->length (); + + if (length > 0) + ACE_OS::write (ACE_STDOUT, + mb->rd_ptr (), + ACE_OS::strlen (mb->rd_ptr ())); + + mb->release (); + + if (length == 0) + break; + } + + if (result == -1 && errno == EWOULDBLOCK) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n%a"), + ACE_TEXT ("timed out waiting for message"), + 1)); + return 0; +} + +int +Filter::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) +{ + if (mb->length () == 0) + return this->put_next (mb, tv); + else + { + char buf[BUFSIZ]; + + // Stash a copy of the buffer away. + ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf); + + // Increase the size of the buffer large enough that it will be + // reallocated (in order to test the reallocation mechanisms). + + mb->size (mb->length () + BUFSIZ); + mb->length (mb->size ()); + + // Prepend the line count in front of the buffer. + ACE_OS::sprintf (mb->rd_ptr (), + ACE_SIZE_T_FORMAT_SPECIFIER + ": %s", + this->count_++, + buf); + return this->put_next (mb, tv); + } +} + +// Main driver function. + +int +ACE_TMAIN (int, ACE_TCHAR *argv[]) +{ + ACE_Service_Config daemon (argv[0]); + + // This Stream controls hierachically-related active objects. + MT_Stream stream; + + MT_Module *pm = 0; + MT_Module *fm = 0; + MT_Module *cm = 0; + + ACE_NEW_RETURN (cm, + MT_Module (ACE_TEXT ("Consumer"), + new Consumer), + -1); + ACE_NEW_RETURN (fm, + MT_Module (ACE_TEXT ("Filter"), + new Filter), + -1); + ACE_NEW_RETURN (pm, + MT_Module (ACE_TEXT ("Producer"), + new Producer), + -1); + + // Create Consumer, Filter, and Producer Modules and push them onto + // the Stream. All processing is performed in the Stream. + + if (stream.push (cm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push")), + 1); + else if (stream.push (fm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push")), + 1); + else if (stream.push (pm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("push")), + 1); + // Barrier synchronization: wait for the threads to exit, then exit + // ourselves. + ACE_Thread_Manager::instance ()->wait (); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("threads not supported on this platform\n"))); + return 0; +} +#endif /* ACE_HAS_THREADS */ |