diff options
author | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-10-11 15:10:34 +0000 |
---|---|---|
committer | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-10-11 15:10:34 +0000 |
commit | 23111468052a9f4ec607b9b10123f62004348216 (patch) | |
tree | f81e272d0159b645be87a876da75ae4911e4dfaa /examples/ASX/Message_Queue/buffer_stream.cpp | |
parent | d71e1419e3c009e237fec7a91e832be8fc0fe31a (diff) | |
download | ATCD-TAO-1_0_6.tar.gz |
This commit was manufactured by cvs2svn to create tag 'TAO-1_0_6'.TAO-1_0_6
Diffstat (limited to 'examples/ASX/Message_Queue/buffer_stream.cpp')
-rw-r--r-- | examples/ASX/Message_Queue/buffer_stream.cpp | 308 |
1 files changed, 0 insertions, 308 deletions
diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp deleted file mode 100644 index 02911c02a46..00000000000 --- a/examples/ASX/Message_Queue/buffer_stream.cpp +++ /dev/null @@ -1,308 +0,0 @@ -// $Id$ - -// This short program copies stdin to stdout via the use of an ASX -// Stream. It illustrates an implementation of the classic "bounded -// buffer" program using an ASX Stream containing two Modules. Each -// ACE_Module contains two Tasks. Each ACE_Task contains a -// ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how -// the use of these reusable components reduces the reliance on global -// variables, as compared with the bounded_buffer.C example. - -#include "ace/Synch.h" -#include "ace/Service_Config.h" -#include "ace/Stream.h" -#include "ace/Module.h" -#include "ace/Task.h" - -ACE_RCSID(Message_Queue, buffer_stream, "$Id$") - -#if defined (ACE_HAS_THREADS) - -typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; -typedef ACE_Module<ACE_MT_SYNCH> MT_Module; -typedef ACE_Task<ACE_MT_SYNCH> MT_Task; - -class Common_Task : public MT_Task - // = TITLE - // Methods that are common to the producer and consumer. -{ -public: - Common_Task (void) {} - // ACE_Task hooks - virtual int open (void * = 0); - virtual int close (u_long = 0); -}; - -// Define the Producer interface. - -class Producer : public Common_Task -{ -public: - Producer (void) {} - - // Read data from stdin and pass to consumer. - virtual int svc (void); -}; - -class Consumer : public Common_Task - // = TITLE - // Define the Consumer interface. -{ -public: - Consumer (void) {} - - virtual int put (ACE_Message_Block *mb, - ACE_Time_Value *tv = 0); - // Enqueue the message on the ACE_Message_Queue for subsequent - // handling in the svc() method. - - virtual int svc (void); - // Receive message from producer and print to stdout. - -private: - - ACE_Time_Value timeout_; -}; - -class Filter : public MT_Task - // = TITLE - // Defines a Filter that prepends a line number in front of each - // line. -{ -public: - Filter (void): count_ (1) {} - - virtual int put (ACE_Message_Block *mb, - ACE_Time_Value *tv = 0); - // Change the size of the message before passing it downstream. - -private: - size_t count_; - // Count the number of lines passing through the filter. -}; - -// Spawn off a new thread. - -int -Common_Task::open (void *) -{ - if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "spawn"), - -1); - return 0; -} - -int -Common_Task::close (u_long exit_status) -{ - ACE_DEBUG ((LM_DEBUG, - "(%t) thread is exiting with status %d in module %s\n", - exit_status, - this->name ())); - - // Can do anything here that is required when a thread exits, e.g., - // storing thread-specific information in some other storage - // location, etc. - return 0; -} - -// The Consumer reads data from the stdin stream, creates a message, -// and then queues the message in the message list, where it is -// removed by the consumer thread. A 0-sized message is enqueued when -// there is no more data to read. The consumer uses this as a flag to -// know when to exit. - -int -Producer::svc (void) -{ - // Keep reading stdin, until we reach EOF. - - for (int n; ; ) - { - // Allocate a new message (add one to avoid nasty boundary - // conditions). - - ACE_Message_Block *mb; - - ACE_NEW_RETURN (mb, - ACE_Message_Block (BUFSIZ + 1), - -1); - - n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ); - - if (n <= 0) - { - // Send a shutdown message to the other thread and exit. - mb->length (0); - - if (this->put_next (mb) == -1) - ACE_ERROR ((LM_ERROR, - "(%t) %p\n", - "put_next")); - break; - } - - // Send the message to the other thread. - else - { - mb->wr_ptr (n); - // NUL-terminate the string (since we use strlen() on it - // later). - mb->rd_ptr ()[n] = '\0'; - - if (this->put_next (mb) == -1) - ACE_ERROR ((LM_ERROR, - "(%t) %p\n", - "put_next")); - } - } - - return 0; -} - -// Simply enqueue the Message_Block into the end of the queue. - -int -Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv) -{ - return this->putq (mb, tv); -} - -// The consumer dequeues a message from the ACE_Message_Queue, writes -// the message to the stderr stream, and deletes the message. The -// Consumer sends a 0-sized message to inform the consumer to stop -// reading and exit. - -int -Consumer::svc (void) -{ - int result = 0; - - // Keep looping, reading a message out of the queue, until we - // timeout or get a message with a length == 0, which signals us to - // quit. - - for (;;) - { - ACE_Message_Block *mb; - - // Wait for upto 4 seconds. - this->timeout_.sec (ACE_OS::time (0) + 4); - - result = this->getq (mb, &this->timeout_); - - if (result == -1) - break; - - int length = mb->length (); - - if (length > 0) - ACE_OS::write (ACE_STDOUT, - mb->rd_ptr (), - ACE_OS::strlen (mb->rd_ptr ())); - - mb->release (); - - if (length == 0) - break; - } - - if (result == -1 && errno == EWOULDBLOCK) - ACE_ERROR ((LM_ERROR, - "(%t) %p\n%a", - "timed out waiting for message", - 1)); - return 0; -} - -// The filter prepends a line number in front of each line. - -int -Filter::put (ACE_Message_Block *mb, - ACE_Time_Value *tv) -{ - if (mb->length () == 0) - return this->put_next (mb, tv); - else - { - char buf[BUFSIZ]; - - // Stash a copy of the buffer away. - ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf); - - // Increase the size of the buffer large enough that it will be - // reallocated (in order to test the reallocation mechanisms). - - mb->size (mb->length () + BUFSIZ); - mb->length (mb->size ()); - - // Prepend the line count in front of the buffer. - ACE_OS::sprintf (mb->rd_ptr (), - "%d: %s", - this->count_++, - buf); - return this->put_next (mb, tv); - } -} - -// Main driver function. - -int -main (int, char *argv[]) -{ - ACE_Service_Config daemon (argv[0]); - - // This Stream controls hierachically-related active objects. - MT_Stream stream; - - MT_Module *pm; - MT_Module *fm; - MT_Module *cm; - - ACE_NEW_RETURN (cm, - MT_Module ("Consumer", - new Consumer), - -1); - ACE_NEW_RETURN (fm, - MT_Module ("Filter", - new Filter), - -1); - ACE_NEW_RETURN (pm, - MT_Module ("Producer", - new Producer), - -1); - - // Create Consumer, Filter, and Producer Modules and push them onto - // the Stream. All processing is performed in the Stream. - - if (stream.push (cm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "push"), - 1); - else if (stream.push (fm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "push"), - 1); - else if (stream.push (pm) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "push"), - 1); - // Barrier synchronization: wait for the threads to exit, then exit - // ourselves. - ACE_Thread_Manager::instance ()->wait (); - return 0; -} -#else -int -main (int, char *[]) -{ - ACE_ERROR ((LM_ERROR, - "threads not supported on this platform\n")); - return 0; -} -#endif /* ACE_HAS_THREADS */ |