From ebc019e4ce13412acb20d929d13899ffa1730e5e Mon Sep 17 00:00:00 2001 From: jcej Date: Mon, 12 Oct 1998 23:22:26 +0000 Subject: *** empty log message *** --- docs/ACE-tutorials.html | 3 + docs/tutorials/014/EndTask.h | 80 ++++++++++++++ docs/tutorials/014/Makefile | 69 ++++++++++++ docs/tutorials/014/Task.cpp | 211 +++++++++++++++++++++++++++++++++++ docs/tutorials/014/Task.h | 68 ++++++++++++ docs/tutorials/014/page01.html | 38 +++++++ docs/tutorials/014/page02.html | 91 +++++++++++++++ docs/tutorials/014/page03.html | 243 +++++++++++++++++++++++++++++++++++++++++ docs/tutorials/014/page04.html | 108 ++++++++++++++++++ docs/tutorials/014/page05.html | 215 ++++++++++++++++++++++++++++++++++++ docs/tutorials/014/page06.html | 28 +++++ docs/tutorials/014/stream.cpp | 175 +++++++++++++++++++++++++++++ 12 files changed, 1329 insertions(+) create mode 100644 docs/tutorials/014/EndTask.h create mode 100644 docs/tutorials/014/Makefile create mode 100644 docs/tutorials/014/Task.cpp create mode 100644 docs/tutorials/014/Task.h create mode 100644 docs/tutorials/014/page01.html create mode 100644 docs/tutorials/014/page02.html create mode 100644 docs/tutorials/014/page03.html create mode 100644 docs/tutorials/014/page04.html create mode 100644 docs/tutorials/014/page05.html create mode 100644 docs/tutorials/014/page06.html create mode 100644 docs/tutorials/014/stream.cpp (limited to 'docs') diff --git a/docs/ACE-tutorials.html b/docs/ACE-tutorials.html index adc584e2254..73c3af3ca99 100644 --- a/docs/ACE-tutorials.html +++ b/docs/ACE-tutorials.html @@ -26,6 +26,9 @@ links provide further information on this topic.

o Developing New Tutorials +
o Frequently Made Mistakes (FMM) + diff --git a/docs/tutorials/014/EndTask.h b/docs/tutorials/014/EndTask.h new file mode 100644 index 00000000000..e4ab823f1f9 --- /dev/null +++ b/docs/tutorials/014/EndTask.h @@ -0,0 +1,80 @@ + +// $Id$ + +// EndTask.h +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#ifndef ENDTASK_H +#define ENDTASK_H + +#include "Task.h" + +// When you setup a Stream and push your modules on, +// there are two additional modules that go unseen +// by the user. +// +// The Stream pushes on a Stream_Head in front of +// your first module, and a Stream_Tail behind your +// last module. +// +// If your put() a message to the Stream Tail, it +// assumes you did so in error. This simple EndTask +// class allows you to push a message to it and just +// have it safely Go Away. +// +// All this Task does is release the Message_Block +// and return 0. It's a suitable black-hole. + + +class EndTask : public Task +{ + +public: + + typedef Task inherited; + + EndTask(const char *nameOfTask) : + inherited(nameOfTask, 0) { + + // when we get open()'d, it with 0 threads + // since there is actually no processing to do. + + cerr << __LINE__ << " " << __FILE__ << endl; + }; + + virtual int open(void *) + { + cerr << __LINE__ << " " << __FILE__ << endl; + return 0; + } + + virtual int open(void) + { + cerr << __LINE__ << " " << __FILE__ << endl; + return 0; + } + + virtual ~EndTask(void) { + }; + + virtual int put(ACE_Message_Block *message, + ACE_Time_Value *timeout) { + + cerr << __LINE__ << " " << __FILE__ << endl; + ACE_UNUSED_ARG(timeout); + + // we don't have anything to do, so + // release() the message. + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s EndTask::put() -- releasing Message_Block\n", this->nameOfTask())); + message->release(); + return 0; + }; + +}; + +#endif // ENDTASK_H diff --git a/docs/tutorials/014/Makefile b/docs/tutorials/014/Makefile new file mode 100644 index 00000000000..5a8d8891382 --- /dev/null +++ b/docs/tutorials/014/Makefile @@ -0,0 +1,69 @@ + +# $Id$ + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = stream + +FILES = Task + +BUILD = $(VBIN) + +SRC = $(addsuffix .cpp,$(BIN)) +SRC += $(addsuffix .cpp,$(FILES)) + +HDR = *.h + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU +include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +HTML : # + ../combine *.html + +rename : # + for i in *.cxx ; do \ + n=`expr "$$i" : "\(.*\).cxx"` ;\ + mv $$i $$n.cpp ;\ + done + +Indent : # + for i in $(SRC) $(HDR) ; do \ + indent -npsl -l80 -fca -fc1 -cli0 -cdb -ts2 -bl -bli0 < $$i | \ + sed -e 's/: :/::/g' \ + -e 's/^.*\(public:\)/\1/' \ + -e 's/^.*\(protected:\)/\1/' \ + -e 's/^.*\(private:\)/\1/' \ + -e 's/:\(public\)/ : \1/' \ + -e 's/:\(protected\)/ : \1/' \ + -e 's/:\(private\)/ : \1/' \ + -e 's/ / /g' \ + > $$i~ ;\ + mv $$i~ $$i ;\ + done + +Depend : depend + perl ../007/fix.Makefile + +.depend : # + touch .depend + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +include .depend diff --git a/docs/tutorials/014/Task.cpp b/docs/tutorials/014/Task.cpp new file mode 100644 index 00000000000..4dc77b6e03b --- /dev/null +++ b/docs/tutorials/014/Task.cpp @@ -0,0 +1,211 @@ + +// $Id$ + +// Task.cxx +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#include + +#include "Task.h" + +Task::Task(const char * nameOfTask, + int numberOfThreads) + : d_numberOfThreads(numberOfThreads), + d_barrier(numberOfThreads) +{ + // Just initialize our name, number of threads, and barrier. + + ACE_OS::strcpy(d_nameOfTask, nameOfTask); +} + +Task::~Task(void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask)); +} + +int Task::open(void *arg) +{ + ACE_UNUSED_ARG(arg); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask)); + + // call ACE_Task::activate() to spawn the threads using + // our Task::svc() as the function to be run. + + // FMM -- Frequently Made Mistake -- + // + // If you specify the flag THR_DETACHED when activating the + // Task, you will get an assert() violation during close(), + // since the Task waits for all of its threads to rejoin. + // + + return this->activate(THR_NEW_LWP, + d_numberOfThreads); +} + +int Task::put(ACE_Message_Block *message, + ACE_Time_Value *timeout) +{ + // ACE_Stream uses the put() method of Tasks to send messages. + // This defaultly does nothing. Here we link our put() method + // directly to our putq() method, so that Messages put() to us + // will appear in the Message_Queue that is checked by the + // service threads. + + return this->putq(message, timeout); +} + +int Task::close(u_long flags) +{ + + // When the Stream closes the Module, the Module then close()'s the Task + // and passing a value of (1) as the flag. + + // When a service thread exits, it calls close() with a value that is not + // (1). + + // We use this fact to tell the difference between closing a service thread, + // and closing the main Task itself. + + if (flags == 1) { + + // The Module has asked to close the main Task. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask)); + + // We create a Message_Block... + + ACE_Message_Block *hangupBlock = new ACE_Message_Block(); + + // And make it of the type MB_HANGUP. + + hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP); + + // We then send this Block into the Message_Queue to be seen by the + // service threads. + + // Once again we duplicate() the Block as send it off... + + if (this->putq(hangupBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1); + } + + // ..and we're free to release() our copy of it. + + hangupBlock->release(); + + // Now, all we have to do is wait() for the service threads to all + // exit. This is where using THR_DETACHED in the activate() method + // will come back to haunt you. + + // The Stream waits until this returns before attempting to remove + // the next Module/Task group in the Stream. This allows for an + // orderly shutting down of the Stream. + + return this->wait(); + + + } else { + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask)); + + // This is where we can clean up any mess left over by each service thread. + // In this Task, there is nothing to do. + + } + + return 0; + +} + +int Task::svc(void) +{ + + // This is the function that our service threads run once they are spawned. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask)); + + // First, we wait until all of our peer service threads have arrived + // at this point also. + + d_barrier.wait(); + + ACE_Message_Block *messageBlock; + + while (1) { + + // And now we loop almost infinitely. + + // getq() will block until a Message_Block is available to be read, + // or an error occurs. + + if ( this->getq(messageBlock, 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1); + } + + if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) { + + // If the Message_Block is of type MB_HANGUP, then we're being asked + // to shut down nicely. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask)); + + // So, we duplicate the Block, and put it back into the Message_Queue, + // in case there are some more peer service threads still running. + + if (this->putq(messageBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1); + } + + // We release our copy of the Block. + messageBlock->release(); + + // And we break out of the nearly infinitely loop, and + // head towards close() ourselves. + break; + } + + // If we're here, then we've received a Message_Block that was + // not informing us to quit, so we're assuming it's a valid + // meaningful Block. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask)); + + // We grab the read-pointer from the Block, and display it through a DEBUG statement. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() )); + + // We pretend that this takes to time to process the Block. + // If you're on a fast machine, you might have to raise this + // value to actually witness different threads handling + // blocks for each Task. + + ACE_OS::sleep (ACE_Time_Value (0, 250)); + + // Since we're part of a Stream, we duplicate the Block, and + // send it on to the next Task. + + if (put_next(messageBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1); + } + + // And then we release our copy of it. + + messageBlock->release(); + + } + + return 0; + +} + + +const char * Task::nameOfTask(void) const +{ + return d_nameOfTask; +} diff --git a/docs/tutorials/014/Task.h b/docs/tutorials/014/Task.h new file mode 100644 index 00000000000..e4797f49892 --- /dev/null +++ b/docs/tutorials/014/Task.h @@ -0,0 +1,68 @@ + +// $Id$ + +// Task.h +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#ifndef TASK_H +#define TASK_H + +#include +#include + +// Always typedef when possible. + +typedef ACE_Task Task_Base; + +class Task : public Task_Base +{ + +public: + + typedef Task_Base inherited; + // This is just good form. + + Task(const char *nameOfTask, + int numberOfThreads); + // Initialize our Task with a name, + // and number of threads to spawn. + + virtual ~Task(void); + + virtual int open(void *arg); + // This is provided to prevent compiler complaints + // about hidden virtual functions. + + virtual int close(u_long flags); + // This closes down the Task and all service threads. + + virtual int put(ACE_Message_Block *message, + ACE_Time_Value *timeout); + // This is the interface that ACE_Stream uses to + // communicate with our Task. + + virtual int svc(void); + // This is the actual service loop each of the service + // threads iterates through. + + const char *nameOfTask(void) const; + // Returns the name of this Task. + +private: + + int d_numberOfThreads; + char d_nameOfTask[64]; + + ACE_Barrier d_barrier; + // Simple Barrier to make sure all of our service + // threads have entered their loop before accepting + // any messages. +}; + + +#endif // TASK_H diff --git a/docs/tutorials/014/page01.html b/docs/tutorials/014/page01.html new file mode 100644 index 00000000000..ac4a0ea6d65 --- /dev/null +++ b/docs/tutorials/014/page01.html @@ -0,0 +1,38 @@ + + + + + ACE Tutorial 014 + + + +
ACE Tutorial 014
+ +
ACE_Stream Tutorial, Of Sorts
+ +

+


+ +

ACE_Stream is handy when you have several ACE_Task objects +that you would like to link together. + +

An intermediate class you we will deal with is the ACE_Module. + +

The basic plan is to wrap your Task into a Module, push +the Module onto the Stream. Do this for each Task, + and then inject Message_Blocks into the Stream. + +

Each Task then processes the Message_Block, and forwards +it on to the next Task in the Stream. + +

If you are not already familiar with Message_Blocks and Message_Queues, +I highly suggest that you check out Tutorials 10-13. + +

Streams can be used for both downstream and upstream movement of messages. Used +this way mirrors closely the way System V STREAMS work. But you don't have to use them +bidirectionally. In this tutorial, we only use one direction of the Stream. Down. + +

This tutorial is contributed by Bob McWhirter (bob@netwrench.com) +

+


+
[Tutorial Index] [Continue This Tutorial]
diff --git a/docs/tutorials/014/page02.html b/docs/tutorials/014/page02.html new file mode 100644 index 00000000000..9dd05c3a0b5 --- /dev/null +++ b/docs/tutorials/014/page02.html @@ -0,0 +1,91 @@ + + + + + ACE Tutorial 014 + + + +
ACE Tutorial 014
+ +
ACE_Stream Tutorial, Of Sorts
+ +

+


+

+You find pretty soon that anytime you work with ACE_Task<> you + have to create a derivative. The Task.h header simply provides + that derivative with the overrides we'll need in our application. +

+


+
+// $Id$
+
+// Task.h
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#ifndef TASK_H
+#define TASK_H
+
+#include <ace/Task.h>
+#include <ace/Synch.h>
+
+// Always typedef when possible.
+
+typedef ACE_Task<ACE_MT_SYNCH> Task_Base;
+
+class Task : public Task_Base
+{
+
+public:
+
+  typedef Task_Base inherited;
+  // This is just good form.
+
+  Task(const char *nameOfTask,
+       int numberOfThreads);
+  // Initialize our Task with a name,
+  // and number of threads to spawn.
+
+  virtual ~Task(void);
+
+  virtual int open(void *arg);
+  // This is provided to prevent compiler complaints
+  // about hidden virtual functions.
+
+  virtual int close(u_long flags);
+  // This closes down the Task and all service threads.
+
+  virtual int put(ACE_Message_Block *message,
+		  ACE_Time_Value *timeout);
+  // This is the interface that ACE_Stream uses to 
+  // communicate with our Task.
+
+  virtual int svc(void);
+  // This is the actual service loop each of the service
+  // threads iterates through.
+
+  const char *nameOfTask(void) const;
+  // Returns the name of this Task.
+
+private:
+
+  int d_numberOfThreads;
+  char d_nameOfTask[64];
+
+  ACE_Barrier d_barrier;
+  // Simple Barrier to make sure all of our service
+  // threads have entered their loop before accepting
+  // any messages.
+};
+
+
+#endif // TASK_H
+
+


+
[Tutorial Index] [Continue This Tutorial]
diff --git a/docs/tutorials/014/page03.html b/docs/tutorials/014/page03.html new file mode 100644 index 00000000000..a0edf26f328 --- /dev/null +++ b/docs/tutorials/014/page03.html @@ -0,0 +1,243 @@ + + + + + ACE Tutorial 014 + + + +
ACE Tutorial 014
+ +
ACE_Stream Tutorial, Of Sorts
+ +

+


+

+Before we get to main() let's take a look at the Task implementation. + While we've overridden several methods, the real work is done in + the close() and svc() methods. +

+Notice how close() figures out if it is being called by the shutdown + of the ACE_Stream or by the exit of svc(). The magic here is + provided by the flags parameter. By handling the stream + shutdown in this way, we don't have to do anything strange in + svc(). We also don't end up with extra hangup messages in the + queue when the dust all settles down. +

+Like our other tutorials, svc() looks for a hangup and processes data. +

+


+
+// $Id$
+
+// Task.cxx
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#include <ace/Message_Block.h>
+
+#include "Task.h"
+
+Task::Task(const char * nameOfTask,
+	   int numberOfThreads)
+  : d_numberOfThreads(numberOfThreads),
+    d_barrier(numberOfThreads)
+{
+  // Just initialize our name, number of threads, and barrier.
+
+  ACE_OS::strcpy(d_nameOfTask, nameOfTask);
+}
+
+Task::~Task(void)
+{
+  ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask));
+}
+
+int Task::open(void *arg) 
+{
+  ACE_UNUSED_ARG(arg);
+
+  ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask));
+  
+  // call ACE_Task::activate() to spawn the threads using
+  // our Task::svc() as the function to be run.
+
+  // FMM -- Frequently Made Mistake --
+  //  
+  // If you specify the flag THR_DETACHED when activating the
+  // Task, you will get an assert() violation during close(),
+  // since the Task waits for all of its threads to rejoin.
+  // 
+
+  return this->activate(THR_NEW_LWP,
+			d_numberOfThreads);
+}
+
+int Task::put(ACE_Message_Block *message,
+	      ACE_Time_Value *timeout)
+{
+  // ACE_Stream uses the put() method of Tasks to send messages.
+  // This defaultly does nothing.  Here we link our put() method
+  // directly to our putq() method, so that Messages put() to us
+  // will appear in the Message_Queue that is checked by the
+  // service threads.
+  
+  return this->putq(message, timeout);
+}
+
+int Task::close(u_long flags)
+{
+
+  // When the Stream closes the Module, the Module then close()'s the Task
+  // and passing a value of (1) as the flag.
+
+  // When a service thread exits, it calls close() with a value that is not
+  // (1).
+
+  // We use this fact to tell the difference between closing a service thread,
+  // and closing the main Task itself.
+
+  if (flags == 1) {
+
+    // The Module has asked to close the main Task.
+
+    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask));
+
+    // We create a Message_Block...
+
+    ACE_Message_Block *hangupBlock = new ACE_Message_Block();
+
+    // And make it of the type MB_HANGUP.  
+
+    hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP);
+
+    // We then send this Block into the Message_Queue to be seen by the 
+    // service threads.
+
+    // Once again we duplicate() the Block as send it off...
+    
+    if (this->putq(hangupBlock->duplicate()) == -1) {
+      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1);
+    }
+    
+    // ..and we're free to release() our copy of it.
+
+    hangupBlock->release();
+
+    // Now, all we have to do is wait() for the service threads to all 
+    // exit.  This is where using THR_DETACHED in the activate() method
+    // will come back to haunt you.
+
+    // The Stream waits until this returns before attempting to remove
+    // the next Module/Task group in the Stream.  This allows for an
+    // orderly shutting down of the Stream.
+
+    return this->wait();
+
+
+  } else {
+
+    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask));
+
+    // This is where we can clean up any mess left over by each service thread.
+    // In this Task, there is nothing to do.
+
+  }
+
+  return 0;
+
+}
+
+int Task::svc(void)
+{
+
+  // This is the function that our service threads run once they are spawned.
+
+  ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask));
+
+  // First, we wait until all of our peer service threads have arrived
+  // at this point also.
+
+  d_barrier.wait();
+
+  ACE_Message_Block *messageBlock;
+
+  while (1) {
+
+    // And now we loop almost infinitely.
+
+    // getq() will block until a Message_Block is available to be read,
+    // or an error occurs.
+
+    if ( this->getq(messageBlock, 0) == -1) {
+      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1);
+    }
+
+    if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) {
+      
+      // If the Message_Block is of type MB_HANGUP, then we're being asked
+      // to shut down nicely.
+
+      ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask));
+
+      // So, we duplicate the Block, and put it back into the Message_Queue,
+      // in case there are some more peer service threads still running.
+
+      if (this->putq(messageBlock->duplicate()) == -1) {
+	ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1);
+      }
+
+      // We release our copy of the Block.
+      messageBlock->release();
+
+      // And we break out of the nearly infinitely loop, and
+      // head towards close() ourselves.
+      break;
+    }
+
+    // If we're here, then we've received a Message_Block that was 
+    // not informing us to quit, so we're assuming it's a valid
+    // meaningful Block.
+
+    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask));
+
+    // We grab the read-pointer from the Block, and display it through a DEBUG statement.
+
+    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() ));
+
+    // We pretend that this takes to time to process the Block.
+    // If you're on a fast machine, you might have to raise this
+    // value to actually witness different threads handling
+    // blocks for each Task.
+
+    ACE_OS::sleep (ACE_Time_Value (0, 250));
+
+    // Since we're part of a Stream, we duplicate the Block, and 
+    // send it on to the next Task.
+
+    if (put_next(messageBlock->duplicate()) == -1) {
+      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1);
+    }
+    
+    // And then we release our copy of it.
+
+    messageBlock->release();
+
+  }
+
+  return 0;
+
+}
+
+
+const char * Task::nameOfTask(void) const
+{
+  return d_nameOfTask;
+}
+
+


+
[Tutorial Index] [Continue This Tutorial]
diff --git a/docs/tutorials/014/page04.html b/docs/tutorials/014/page04.html new file mode 100644 index 00000000000..c173178dbc2 --- /dev/null +++ b/docs/tutorials/014/page04.html @@ -0,0 +1,108 @@ + + + + + ACE Tutorial 014 + + + +
ACE Tutorial 014
+ +
ACE_Stream Tutorial, Of Sorts
+ +

+


+

+As stated in the comments below, the default action of the task at the + stream tail is to treat any received data as an error. In our + implementation it will often happen that data gets through to + the tail. How, then, do we handle this without creating an + error condition? Simple: Create a custom Task for use as the + stream tail that doesn't consider it an error to receive data. +

+Read on... +

+


+
+// $Id$
+
+// EndTask.h
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#ifndef ENDTASK_H
+#define ENDTASK_H
+
+#include "Task.h"
+
+// When you setup a Stream and push your modules on,
+// there are two additional modules that go unseen
+// by the user.
+//
+// The Stream pushes on a Stream_Head in front of
+// your first module, and a Stream_Tail behind your
+// last module.
+//
+// If your put() a message to the Stream Tail, it 
+// assumes you did so in error. This simple EndTask
+// class allows you to push a message to it and just 
+// have it safely Go Away.
+//
+// All this Task does is release the Message_Block
+// and return 0.  It's a suitable black-hole.
+
+
+class EndTask : public Task
+{
+
+public:
+
+  typedef Task inherited;
+
+  EndTask(const char *nameOfTask) :
+    inherited(nameOfTask, 0) { 
+
+    // when we get open()'d, it with 0 threads
+    // since there is actually no processing to do.
+
+	cerr << __LINE__ << " " << __FILE__ << endl;
+  };
+
+  virtual int open(void *)
+  {
+	cerr << __LINE__ << " " << __FILE__ << endl;
+	return 0;
+  }
+
+  virtual int open(void)
+  {
+	cerr << __LINE__ << " " << __FILE__ << endl;
+	return 0;
+  }
+
+  virtual ~EndTask(void) {
+  };
+
+  virtual int put(ACE_Message_Block *message,
+		  ACE_Time_Value *timeout) {
+
+	cerr << __LINE__ << " " << __FILE__ << endl;
+    ACE_UNUSED_ARG(timeout);
+
+    // we don't have anything to do, so
+    // release() the message.
+    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s EndTask::put() -- releasing Message_Block\n", this->nameOfTask()));
+    message->release();
+    return 0;
+  };
+
+};
+
+#endif // ENDTASK_H
+
+


+
[Tutorial Index] [Continue This Tutorial]
diff --git a/docs/tutorials/014/page05.html b/docs/tutorials/014/page05.html new file mode 100644 index 00000000000..c654dd69a83 --- /dev/null +++ b/docs/tutorials/014/page05.html @@ -0,0 +1,215 @@ + + + + + ACE Tutorial 014 + + + +
ACE Tutorial 014
+ +
ACE_Stream Tutorial, Of Sorts
+ +

+


+

+Now we come to main(). In the previous task-chain tutorial + every thread pool had to have the same number of threads. This + time around, we leverage the construction method of ACE_Stream + and ACE_Module to customize the thread-pool size in each + ACE_Task of the stream. +

+Remember EndTask from the previous page? We create one here and push + it into the stream to take care of cleaning up the messages. + Technically, we could have replaced the default Tail task + created by the ACE framework but it seems to make more sense to + just push our "tail" onto the stream like the other tasks. The + caveat to this method is that you must be sure you don't push() + any other Modules behind the EndTask! +

+Once the stream of modules containing tasks is all setup then we can + put() some data into the stream for processing. The clever use + of Task::close() makes shutting downt the stream easier than + ever. No messing with hangup messages at the application level, + just close() when you're done! What could be simpler? +

+


+
+
+// $Id$
+
+// stream.cxx
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#include "Task.h"
+#include "EndTask.h"
+// This is our specialized ACE_Task.
+
+#include <ace/Module.h>
+#include <ace/Stream.h>
+// These are the neccessary ACE headers.
+
+
+typedef ACE_Module<ACE_MT_SYNCH> Module;
+typedef ACE_Stream<ACE_MT_SYNCH> Stream;
+// Just to avoid a lot of typing, typedefs
+// are generally a good idea.
+
+int main(int argc, char *argv[])
+{
+  cerr << __LINE__ << endl;
+
+  int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3;
+  // unless otherwise specified, just send three messages
+  // down the stream.
+
+  Stream theStream;
+  // the ACE_Stream itself.
+
+  cerr << __LINE__ << endl;
+
+  // Now, we instantiate 4 different Tasks.  These do not
+  // need to be all the same class, but they do need to
+  // all derrive from the same flavor of ACE_Task.
+  //
+  // Also, we instantiate a fifth end-cap Task to clean
+  // up Message_Blocks as they reach the end.
+
+  Task *taskOne;
+  Task *taskTwo;
+  Task *taskThree;
+  Task *taskFour;
+  Task *taskEnd;
+
+  // Out Task's take two arguments: a name, and the number
+  // of threads to dedicate to the task.
+
+  taskOne = new Task("Task No. 1", 1);
+  taskTwo = new Task("Task No. 2", 3);
+  taskThree = new Task("Task No. 3", 7);
+  taskFour = new Task("Task No. 4", 1);
+
+  // Our EndTask only takes 1 argument, as it actually
+  // doesn't spawn any threads for processing.
+
+  taskEnd = new EndTask("End Task");
+
+  Module *moduleOne;
+  Module *moduleTwo;
+  Module *moduleThree;
+  Module *moduleFour;
+  Module *moduleEnd;
+
+  // ACE_Stream accepts ACE_Modules, which are simply a pair of
+  // ACE_Tasks.  One is dedicated for writing, while the other
+  // is dedicated to reading.  Think of the writing side as
+  // downstream, and the reading side as upstream.
+  //
+  // We're only working with a unidirection Stream today,
+  // so we'll only actually install a Task into the write
+  // side of the module, effectively downstream.
+
+  cerr << __LINE__ << endl;
+  moduleOne = new Module("Module No. 1", taskOne);
+  moduleTwo = new Module("Module No. 2", taskTwo);
+  moduleThree = new Module("Module No. 3", taskThree);
+  moduleFour = new Module("Module No. 4", taskFour);
+  moduleEnd = new Module("Module End", taskEnd);
+
+  cerr << __LINE__ << endl;
+  // Now we push the Modules onto the Stream.
+  // Pushing adds the module to the head, or 
+  // otherwise prepends it to whatever modules
+  // are already installed.
+
+  // So, you need to push() the modules on -backwards-
+  // from our viewpoint.
+
+  if (theStream.push(moduleEnd) == -1) {
+           ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+  } 
+
+  cerr << __LINE__ << endl;
+  if (theStream.push(moduleFour) == -1) {
+        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+  }
+
+  // As we push a Module onto the Stream, it gets opened.
+  // When a Module open()s, it opens the Tasks that it contains.
+  //
+  // Since we cannot provide an argument to this embedded
+  // call to open(), we supplied specified the number of
+  // threads in the constructor of our Tasks.
+
+  if (theStream.push(moduleThree) == -1) {
+        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+  }
+
+  if (theStream.push(moduleTwo) == -1) {
+        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+  }
+
+  if (theStream.push(moduleOne) == -1) {
+        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+  }
+
+  cerr << __LINE__ << endl;
+  // Now that the Modules are open, the Tasks threads should
+  // be launching and entering their svc() loop, so we send
+  // some messages down the Stream.
+
+  int sent = 1;
+
+  ACE_Message_Block *message;
+
+  while (sent <= numberOfMessages) {
+
+    // First, create ourselves a Message_Block.
+    // see Tutorials 10-13 for more information
+    // about Message_Blocks and Message_Queues.
+
+    message = new ACE_Message_Block(128);
+
+    // Now, we grab the write-pointer from the Block,
+    // and sprintf() our text into it.
+
+    ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent);
+
+    // All we have to do now is drop the Message_Block
+    // into the Stream.
+
+    // It is always a good idea to duplicate() a Message_Block
+    // when you put it into any Message_Queue, as then
+    // you can always be allowed to release() your copy
+    // without worry.
+
+    if (theStream.put(message->duplicate(), 0) == -1) {
+      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1);
+    }
+
+    message->release();
+    ++sent;
+  }
+
+  // Now that we've sent our Message_Blocks, close down
+  // the Stream.
+  //
+  // The Stream will automagically delete the Modules and
+  // the contained Tasks.  We don't have to do that.
+  //
+  // This call will block (due to the way we've written our 
+  // Task class) until all Message_Blocks have cleared the
+  // entire Stream, and all associated threads have exited.
+
+  theStream.close();  
+
+  return 0;
+}
+
+


+
[Tutorial Index] [Continue This Tutorial]
diff --git a/docs/tutorials/014/page06.html b/docs/tutorials/014/page06.html new file mode 100644 index 00000000000..01e5fef889c --- /dev/null +++ b/docs/tutorials/014/page06.html @@ -0,0 +1,28 @@ + + + + + ACE Tutorial 014 + + + +
ACE Tutorial 014
+ +
ACE_Stream Tutorial, Of Sorts
+ +

+


+ +Ok, so that's the Stream tutorial. As you can see, it's much simpler + than the task-chain developed last time but at the same time it + is also much more extensible. + + +


+
[Tutorial Index]
diff --git a/docs/tutorials/014/stream.cpp b/docs/tutorials/014/stream.cpp new file mode 100644 index 00000000000..4925bd5cf10 --- /dev/null +++ b/docs/tutorials/014/stream.cpp @@ -0,0 +1,175 @@ + +// $Id$ + +// stream.cxx +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#include "Task.h" +#include "EndTask.h" +// This is our specialized ACE_Task. + +#include +#include +// These are the neccessary ACE headers. + + +typedef ACE_Module Module; +typedef ACE_Stream Stream; +// Just to avoid a lot of typing, typedefs +// are generally a good idea. + +int main(int argc, char *argv[]) +{ + cerr << __LINE__ << endl; + + int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3; + // unless otherwise specified, just send three messages + // down the stream. + + Stream theStream; + // the ACE_Stream itself. + + cerr << __LINE__ << endl; + + // Now, we instantiate 4 different Tasks. These do not + // need to be all the same class, but they do need to + // all derrive from the same flavor of ACE_Task. + // + // Also, we instantiate a fifth end-cap Task to clean + // up Message_Blocks as they reach the end. + + Task *taskOne; + Task *taskTwo; + Task *taskThree; + Task *taskFour; + Task *taskEnd; + + // Out Task's take two arguments: a name, and the number + // of threads to dedicate to the task. + + taskOne = new Task("Task No. 1", 1); + taskTwo = new Task("Task No. 2", 3); + taskThree = new Task("Task No. 3", 7); + taskFour = new Task("Task No. 4", 1); + + // Our EndTask only takes 1 argument, as it actually + // doesn't spawn any threads for processing. + + taskEnd = new EndTask("End Task"); + + Module *moduleOne; + Module *moduleTwo; + Module *moduleThree; + Module *moduleFour; + Module *moduleEnd; + + // ACE_Stream accepts ACE_Modules, which are simply a pair of + // ACE_Tasks. One is dedicated for writing, while the other + // is dedicated to reading. Think of the writing side as + // downstream, and the reading side as upstream. + // + // We're only working with a unidirection Stream today, + // so we'll only actually install a Task into the write + // side of the module, effectively downstream. + + cerr << __LINE__ << endl; + moduleOne = new Module("Module No. 1", taskOne); + moduleTwo = new Module("Module No. 2", taskTwo); + moduleThree = new Module("Module No. 3", taskThree); + moduleFour = new Module("Module No. 4", taskFour); + moduleEnd = new Module("Module End", taskEnd); + + cerr << __LINE__ << endl; + // Now we push the Modules onto the Stream. + // Pushing adds the module to the head, or + // otherwise prepends it to whatever modules + // are already installed. + + // So, you need to push() the modules on -backwards- + // from our viewpoint. + + if (theStream.push(moduleEnd) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + cerr << __LINE__ << endl; + if (theStream.push(moduleFour) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + // As we push a Module onto the Stream, it gets opened. + // When a Module open()s, it opens the Tasks that it contains. + // + // Since we cannot provide an argument to this embedded + // call to open(), we supplied specified the number of + // threads in the constructor of our Tasks. + + if (theStream.push(moduleThree) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + if (theStream.push(moduleTwo) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + if (theStream.push(moduleOne) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + cerr << __LINE__ << endl; + // Now that the Modules are open, the Tasks threads should + // be launching and entering their svc() loop, so we send + // some messages down the Stream. + + int sent = 1; + + ACE_Message_Block *message; + + while (sent <= numberOfMessages) { + + // First, create ourselves a Message_Block. + // see Tutorials 10-13 for more information + // about Message_Blocks and Message_Queues. + + message = new ACE_Message_Block(128); + + // Now, we grab the write-pointer from the Block, + // and sprintf() our text into it. + + ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent); + + // All we have to do now is drop the Message_Block + // into the Stream. + + // It is always a good idea to duplicate() a Message_Block + // when you put it into any Message_Queue, as then + // you can always be allowed to release() your copy + // without worry. + + if (theStream.put(message->duplicate(), 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1); + } + + message->release(); + ++sent; + } + + // Now that we've sent our Message_Blocks, close down + // the Stream. + // + // The Stream will automagically delete the Modules and + // the contained Tasks. We don't have to do that. + // + // This call will block (due to the way we've written our + // Task class) until all Message_Blocks have cleared the + // entire Stream, and all associated threads have exited. + + theStream.close(); + + return 0; +} -- cgit v1.2.1