summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorjcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-10-12 23:22:26 +0000
committerjcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-10-12 23:22:26 +0000
commitebc019e4ce13412acb20d929d13899ffa1730e5e (patch)
treec2a4c8395342cc7b1e71b0830e110d068153e4fb /docs
parent1568065d7af1935a52db8f0d01ece914ebb4b8d8 (diff)
downloadATCD-ebc019e4ce13412acb20d929d13899ffa1730e5e.tar.gz
*** empty log message ***
Diffstat (limited to 'docs')
-rw-r--r--docs/ACE-tutorials.html3
-rw-r--r--docs/tutorials/014/EndTask.h80
-rw-r--r--docs/tutorials/014/Makefile69
-rw-r--r--docs/tutorials/014/Task.cpp211
-rw-r--r--docs/tutorials/014/Task.h68
-rw-r--r--docs/tutorials/014/page01.html38
-rw-r--r--docs/tutorials/014/page02.html91
-rw-r--r--docs/tutorials/014/page03.html243
-rw-r--r--docs/tutorials/014/page04.html108
-rw-r--r--docs/tutorials/014/page05.html215
-rw-r--r--docs/tutorials/014/page06.html28
-rw-r--r--docs/tutorials/014/stream.cpp175
12 files changed, 1329 insertions, 0 deletions
diff --git a/docs/ACE-tutorials.html b/docs/ACE-tutorials.html
index adc584e2254..73c3af3ca99 100644
--- a/docs/ACE-tutorials.html
+++ b/docs/ACE-tutorials.html
@@ -26,6 +26,9 @@ links provide further information on this topic. <P>
<DT> <img alt="o" src="http://www.cs.wustl.edu/~schmidt/gifs/misc/redball.gif"> <A
HREF="http://www.cs.wustl.edu/~schmidt/ACE_wrappers/docs/tutorials/new-tutorials.html">Developing New Tutorials</A>
+ <DT> <img alt="o" src="http://www.cs.wustl.edu/~schmidt/gifs/misc/redball.gif"> <A
+ HREF="http://dox.netwrench.com/acedox/">Frequently Made Mistakes (FMM)</A>
+
</DL>
</TD>
diff --git a/docs/tutorials/014/EndTask.h b/docs/tutorials/014/EndTask.h
new file mode 100644
index 00000000000..e4ab823f1f9
--- /dev/null
+++ b/docs/tutorials/014/EndTask.h
@@ -0,0 +1,80 @@
+
+// $Id$
+
+// EndTask.h
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#ifndef ENDTASK_H
+#define ENDTASK_H
+
+#include "Task.h"
+
+// When you setup a Stream and push your modules on,
+// there are two additional modules that go unseen
+// by the user.
+//
+// The Stream pushes on a Stream_Head in front of
+// your first module, and a Stream_Tail behind your
+// last module.
+//
+// If your put() a message to the Stream Tail, it
+// assumes you did so in error. This simple EndTask
+// class allows you to push a message to it and just
+// have it safely Go Away.
+//
+// All this Task does is release the Message_Block
+// and return 0. It's a suitable black-hole.
+
+
+class EndTask : public Task
+{
+
+public:
+
+ typedef Task inherited;
+
+ EndTask(const char *nameOfTask) :
+ inherited(nameOfTask, 0) {
+
+ // when we get open()'d, it with 0 threads
+ // since there is actually no processing to do.
+
+ cerr << __LINE__ << " " << __FILE__ << endl;
+ };
+
+ virtual int open(void *)
+ {
+ cerr << __LINE__ << " " << __FILE__ << endl;
+ return 0;
+ }
+
+ virtual int open(void)
+ {
+ cerr << __LINE__ << " " << __FILE__ << endl;
+ return 0;
+ }
+
+ virtual ~EndTask(void) {
+ };
+
+ virtual int put(ACE_Message_Block *message,
+ ACE_Time_Value *timeout) {
+
+ cerr << __LINE__ << " " << __FILE__ << endl;
+ ACE_UNUSED_ARG(timeout);
+
+ // we don't have anything to do, so
+ // release() the message.
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s EndTask::put() -- releasing Message_Block\n", this->nameOfTask()));
+ message->release();
+ return 0;
+ };
+
+};
+
+#endif // ENDTASK_H
diff --git a/docs/tutorials/014/Makefile b/docs/tutorials/014/Makefile
new file mode 100644
index 00000000000..5a8d8891382
--- /dev/null
+++ b/docs/tutorials/014/Makefile
@@ -0,0 +1,69 @@
+
+# $Id$
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = stream
+
+FILES = Task
+
+BUILD = $(VBIN)
+
+SRC = $(addsuffix .cpp,$(BIN))
+SRC += $(addsuffix .cpp,$(FILES))
+
+HDR = *.h
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+HTML : #
+ ../combine *.html
+
+rename : #
+ for i in *.cxx ; do \
+ n=`expr "$$i" : "\(.*\).cxx"` ;\
+ mv $$i $$n.cpp ;\
+ done
+
+Indent : #
+ for i in $(SRC) $(HDR) ; do \
+ indent -npsl -l80 -fca -fc1 -cli0 -cdb -ts2 -bl -bli0 < $$i | \
+ sed -e 's/: :/::/g' \
+ -e 's/^.*\(public:\)/\1/' \
+ -e 's/^.*\(protected:\)/\1/' \
+ -e 's/^.*\(private:\)/\1/' \
+ -e 's/:\(public\)/ : \1/' \
+ -e 's/:\(protected\)/ : \1/' \
+ -e 's/:\(private\)/ : \1/' \
+ -e 's/ / /g' \
+ > $$i~ ;\
+ mv $$i~ $$i ;\
+ done
+
+Depend : depend
+ perl ../007/fix.Makefile
+
+.depend : #
+ touch .depend
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+include .depend
diff --git a/docs/tutorials/014/Task.cpp b/docs/tutorials/014/Task.cpp
new file mode 100644
index 00000000000..4dc77b6e03b
--- /dev/null
+++ b/docs/tutorials/014/Task.cpp
@@ -0,0 +1,211 @@
+
+// $Id$
+
+// Task.cxx
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#include <ace/Message_Block.h>
+
+#include "Task.h"
+
+Task::Task(const char * nameOfTask,
+ int numberOfThreads)
+ : d_numberOfThreads(numberOfThreads),
+ d_barrier(numberOfThreads)
+{
+ // Just initialize our name, number of threads, and barrier.
+
+ ACE_OS::strcpy(d_nameOfTask, nameOfTask);
+}
+
+Task::~Task(void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask));
+}
+
+int Task::open(void *arg)
+{
+ ACE_UNUSED_ARG(arg);
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask));
+
+ // call ACE_Task::activate() to spawn the threads using
+ // our Task::svc() as the function to be run.
+
+ // FMM -- Frequently Made Mistake --
+ //
+ // If you specify the flag THR_DETACHED when activating the
+ // Task, you will get an assert() violation during close(),
+ // since the Task waits for all of its threads to rejoin.
+ //
+
+ return this->activate(THR_NEW_LWP,
+ d_numberOfThreads);
+}
+
+int Task::put(ACE_Message_Block *message,
+ ACE_Time_Value *timeout)
+{
+ // ACE_Stream uses the put() method of Tasks to send messages.
+ // This defaultly does nothing. Here we link our put() method
+ // directly to our putq() method, so that Messages put() to us
+ // will appear in the Message_Queue that is checked by the
+ // service threads.
+
+ return this->putq(message, timeout);
+}
+
+int Task::close(u_long flags)
+{
+
+ // When the Stream closes the Module, the Module then close()'s the Task
+ // and passing a value of (1) as the flag.
+
+ // When a service thread exits, it calls close() with a value that is not
+ // (1).
+
+ // We use this fact to tell the difference between closing a service thread,
+ // and closing the main Task itself.
+
+ if (flags == 1) {
+
+ // The Module has asked to close the main Task.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask));
+
+ // We create a Message_Block...
+
+ ACE_Message_Block *hangupBlock = new ACE_Message_Block();
+
+ // And make it of the type MB_HANGUP.
+
+ hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP);
+
+ // We then send this Block into the Message_Queue to be seen by the
+ // service threads.
+
+ // Once again we duplicate() the Block as send it off...
+
+ if (this->putq(hangupBlock->duplicate()) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1);
+ }
+
+ // ..and we're free to release() our copy of it.
+
+ hangupBlock->release();
+
+ // Now, all we have to do is wait() for the service threads to all
+ // exit. This is where using THR_DETACHED in the activate() method
+ // will come back to haunt you.
+
+ // The Stream waits until this returns before attempting to remove
+ // the next Module/Task group in the Stream. This allows for an
+ // orderly shutting down of the Stream.
+
+ return this->wait();
+
+
+ } else {
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask));
+
+ // This is where we can clean up any mess left over by each service thread.
+ // In this Task, there is nothing to do.
+
+ }
+
+ return 0;
+
+}
+
+int Task::svc(void)
+{
+
+ // This is the function that our service threads run once they are spawned.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask));
+
+ // First, we wait until all of our peer service threads have arrived
+ // at this point also.
+
+ d_barrier.wait();
+
+ ACE_Message_Block *messageBlock;
+
+ while (1) {
+
+ // And now we loop almost infinitely.
+
+ // getq() will block until a Message_Block is available to be read,
+ // or an error occurs.
+
+ if ( this->getq(messageBlock, 0) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1);
+ }
+
+ if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) {
+
+ // If the Message_Block is of type MB_HANGUP, then we're being asked
+ // to shut down nicely.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask));
+
+ // So, we duplicate the Block, and put it back into the Message_Queue,
+ // in case there are some more peer service threads still running.
+
+ if (this->putq(messageBlock->duplicate()) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1);
+ }
+
+ // We release our copy of the Block.
+ messageBlock->release();
+
+ // And we break out of the nearly infinitely loop, and
+ // head towards close() ourselves.
+ break;
+ }
+
+ // If we're here, then we've received a Message_Block that was
+ // not informing us to quit, so we're assuming it's a valid
+ // meaningful Block.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask));
+
+ // We grab the read-pointer from the Block, and display it through a DEBUG statement.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() ));
+
+ // We pretend that this takes to time to process the Block.
+ // If you're on a fast machine, you might have to raise this
+ // value to actually witness different threads handling
+ // blocks for each Task.
+
+ ACE_OS::sleep (ACE_Time_Value (0, 250));
+
+ // Since we're part of a Stream, we duplicate the Block, and
+ // send it on to the next Task.
+
+ if (put_next(messageBlock->duplicate()) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1);
+ }
+
+ // And then we release our copy of it.
+
+ messageBlock->release();
+
+ }
+
+ return 0;
+
+}
+
+
+const char * Task::nameOfTask(void) const
+{
+ return d_nameOfTask;
+}
diff --git a/docs/tutorials/014/Task.h b/docs/tutorials/014/Task.h
new file mode 100644
index 00000000000..e4797f49892
--- /dev/null
+++ b/docs/tutorials/014/Task.h
@@ -0,0 +1,68 @@
+
+// $Id$
+
+// Task.h
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#ifndef TASK_H
+#define TASK_H
+
+#include <ace/Task.h>
+#include <ace/Synch.h>
+
+// Always typedef when possible.
+
+typedef ACE_Task<ACE_MT_SYNCH> Task_Base;
+
+class Task : public Task_Base
+{
+
+public:
+
+ typedef Task_Base inherited;
+ // This is just good form.
+
+ Task(const char *nameOfTask,
+ int numberOfThreads);
+ // Initialize our Task with a name,
+ // and number of threads to spawn.
+
+ virtual ~Task(void);
+
+ virtual int open(void *arg);
+ // This is provided to prevent compiler complaints
+ // about hidden virtual functions.
+
+ virtual int close(u_long flags);
+ // This closes down the Task and all service threads.
+
+ virtual int put(ACE_Message_Block *message,
+ ACE_Time_Value *timeout);
+ // This is the interface that ACE_Stream uses to
+ // communicate with our Task.
+
+ virtual int svc(void);
+ // This is the actual service loop each of the service
+ // threads iterates through.
+
+ const char *nameOfTask(void) const;
+ // Returns the name of this Task.
+
+private:
+
+ int d_numberOfThreads;
+ char d_nameOfTask[64];
+
+ ACE_Barrier d_barrier;
+ // Simple Barrier to make sure all of our service
+ // threads have entered their loop before accepting
+ // any messages.
+};
+
+
+#endif // TASK_H
diff --git a/docs/tutorials/014/page01.html b/docs/tutorials/014/page01.html
new file mode 100644
index 00000000000..ac4a0ea6d65
--- /dev/null
+++ b/docs/tutorials/014/page01.html
@@ -0,0 +1,38 @@
+<HTML>
+<HEAD>
+ <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
+ <META NAME="Author" CONTENT="Bob McWhirter">
+ <TITLE>ACE Tutorial 014</TITLE>
+</HEAD>
+<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">
+
+<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>
+
+<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>
+
+<P>
+<HR WIDTH="100%">
+
+<p><b>ACE_Stream</b> is handy when you have several <b>ACE_Task</b> objects
+that you would like to link together.
+
+<p>An intermediate class you we will deal with is the <b>ACE_Module</b>.
+
+<p>The basic plan is to wrap your <b>Task</b> into a <b>Module</b>, push
+the <b>Module</b> onto the <b>Stream</b>. Do this for each <b>Task</b>,
+ and then inject <b>Message_Block</b>s into the <b>Stream</b>.
+
+<p>Each <b>Task</b> then processes the <b>Message_Block</b>, and forwards
+it on to the next <b>Task</b> in the <b>Stream</b>.
+
+<p>If you are not already familiar with <b>Message_Block</b>s and <b>Message_Queue</b>s,
+I highly suggest that you check out <A HREF="../#MQ">Tutorials 10-13</A>.
+
+<p>Streams can be used for both downstream and upstream movement of messages. Used
+this way mirrors closely the way System V STREAMS work. But you don't have to use them
+bidirectionally. In this tutorial, we only use one direction of the Stream. Down.
+
+<p>This tutorial is contributed by Bob McWhirter (bob@netwrench.com)
+<P>
+<P><HR WIDTH="100%">
+<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page02.html">Continue This Tutorial</A>]</CENTER>
diff --git a/docs/tutorials/014/page02.html b/docs/tutorials/014/page02.html
new file mode 100644
index 00000000000..9dd05c3a0b5
--- /dev/null
+++ b/docs/tutorials/014/page02.html
@@ -0,0 +1,91 @@
+<HTML>
+<HEAD>
+ <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
+ <META NAME="Author" CONTENT="Bob McWhirter">
+ <TITLE>ACE Tutorial 014</TITLE>
+</HEAD>
+<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">
+
+<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>
+
+<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>
+
+<P>
+<HR WIDTH="100%">
+<P>
+You find pretty soon that anytime you work with ACE_Task&lt;&gt; you
+ have to create a derivative. The Task.h header simply provides
+ that derivative with the overrides we'll need in our application.
+<P>
+<HR WIDTH="100%"><PRE>
+
+// $Id$
+
+// Task.h
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#ifndef TASK_H
+#define TASK_H
+
+#include &lt;ace/Task.h>
+#include &lt;ace/Synch.h>
+
+// Always typedef when possible.
+
+typedef ACE_Task&lt;ACE_MT_SYNCH> Task_Base;
+
+class Task : public Task_Base
+{
+
+public:
+
+ typedef Task_Base inherited;
+ // This is just good form.
+
+ Task(const char *nameOfTask,
+ int numberOfThreads);
+ // Initialize our Task with a name,
+ // and number of threads to spawn.
+
+ virtual ~Task(void);
+
+ virtual int open(void *arg);
+ // This is provided to prevent compiler complaints
+ // about hidden virtual functions.
+
+ virtual int close(u_long flags);
+ // This closes down the Task and all service threads.
+
+ virtual int put(ACE_Message_Block *message,
+ ACE_Time_Value *timeout);
+ // This is the interface that ACE_Stream uses to
+ // communicate with our Task.
+
+ virtual int svc(void);
+ // This is the actual service loop each of the service
+ // threads iterates through.
+
+ const char *nameOfTask(void) const;
+ // Returns the name of this Task.
+
+private:
+
+ int d_numberOfThreads;
+ char d_nameOfTask[64];
+
+ ACE_Barrier d_barrier;
+ // Simple Barrier to make sure all of our service
+ // threads have entered their loop before accepting
+ // any messages.
+};
+
+
+#endif // TASK_H
+</PRE>
+<P><HR WIDTH="100%">
+<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page03.html">Continue This Tutorial</A>]</CENTER>
diff --git a/docs/tutorials/014/page03.html b/docs/tutorials/014/page03.html
new file mode 100644
index 00000000000..a0edf26f328
--- /dev/null
+++ b/docs/tutorials/014/page03.html
@@ -0,0 +1,243 @@
+<HTML>
+<HEAD>
+ <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
+ <META NAME="Author" CONTENT="Bob McWhirter">
+ <TITLE>ACE Tutorial 014</TITLE>
+</HEAD>
+<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">
+
+<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>
+
+<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>
+
+<P>
+<HR WIDTH="100%">
+<P>
+Before we get to main() let's take a look at the Task implementation.
+ While we've overridden several methods, the real work is done in
+ the close() and svc() methods.
+<P>
+Notice how close() figures out if it is being called by the shutdown
+ of the ACE_Stream or by the exit of svc(). The magic here is
+ provided by the <i>flags</i> parameter. By handling the stream
+ shutdown in this way, we don't have to do anything strange in
+ svc(). We also don't end up with extra hangup messages in the
+ queue when the dust all settles down.
+<P>
+Like our other tutorials, svc() looks for a hangup and processes data.
+<P>
+<HR WIDTH="100%"><PRE>
+
+// $Id$
+
+// Task.cxx
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#include &lt;ace/Message_Block.h>
+
+#include "Task.h"
+
+Task::Task(const char * nameOfTask,
+ int numberOfThreads)
+ : d_numberOfThreads(numberOfThreads),
+ d_barrier(numberOfThreads)
+{
+ // Just initialize our name, number of threads, and barrier.
+
+ ACE_OS::strcpy(d_nameOfTask, nameOfTask);
+}
+
+Task::~Task(void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask));
+}
+
+int Task::open(void *arg)
+{
+ ACE_UNUSED_ARG(arg);
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask));
+
+ // call ACE_Task::activate() to spawn the threads using
+ // our Task::svc() as the function to be run.
+
+ // FMM -- Frequently Made Mistake --
+ //
+ // If you specify the flag THR_DETACHED when activating the
+ // Task, you will get an assert() violation during close(),
+ // since the Task waits for all of its threads to rejoin.
+ //
+
+ return this->activate(THR_NEW_LWP,
+ d_numberOfThreads);
+}
+
+int Task::put(ACE_Message_Block *message,
+ ACE_Time_Value *timeout)
+{
+ // ACE_Stream uses the put() method of Tasks to send messages.
+ // This defaultly does nothing. Here we link our put() method
+ // directly to our putq() method, so that Messages put() to us
+ // will appear in the Message_Queue that is checked by the
+ // service threads.
+
+ return this->putq(message, timeout);
+}
+
+int Task::close(u_long flags)
+{
+
+ // When the Stream closes the Module, the Module then close()'s the Task
+ // and passing a value of (1) as the flag.
+
+ // When a service thread exits, it calls close() with a value that is not
+ // (1).
+
+ // We use this fact to tell the difference between closing a service thread,
+ // and closing the main Task itself.
+
+ if (flags == 1) {
+
+ // The Module has asked to close the main Task.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask));
+
+ // We create a Message_Block...
+
+ ACE_Message_Block *hangupBlock = new ACE_Message_Block();
+
+ // And make it of the type MB_HANGUP.
+
+ hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP);
+
+ // We then send this Block into the Message_Queue to be seen by the
+ // service threads.
+
+ // Once again we duplicate() the Block as send it off...
+
+ if (this->putq(hangupBlock->duplicate()) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1);
+ }
+
+ // ..and we're free to release() our copy of it.
+
+ hangupBlock->release();
+
+ // Now, all we have to do is wait() for the service threads to all
+ // exit. This is where using THR_DETACHED in the activate() method
+ // will come back to haunt you.
+
+ // The Stream waits until this returns before attempting to remove
+ // the next Module/Task group in the Stream. This allows for an
+ // orderly shutting down of the Stream.
+
+ return this->wait();
+
+
+ } else {
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask));
+
+ // This is where we can clean up any mess left over by each service thread.
+ // In this Task, there is nothing to do.
+
+ }
+
+ return 0;
+
+}
+
+int Task::svc(void)
+{
+
+ // This is the function that our service threads run once they are spawned.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask));
+
+ // First, we wait until all of our peer service threads have arrived
+ // at this point also.
+
+ d_barrier.wait();
+
+ ACE_Message_Block *messageBlock;
+
+ while (1) {
+
+ // And now we loop almost infinitely.
+
+ // getq() will block until a Message_Block is available to be read,
+ // or an error occurs.
+
+ if ( this->getq(messageBlock, 0) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1);
+ }
+
+ if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) {
+
+ // If the Message_Block is of type MB_HANGUP, then we're being asked
+ // to shut down nicely.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask));
+
+ // So, we duplicate the Block, and put it back into the Message_Queue,
+ // in case there are some more peer service threads still running.
+
+ if (this->putq(messageBlock->duplicate()) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1);
+ }
+
+ // We release our copy of the Block.
+ messageBlock->release();
+
+ // And we break out of the nearly infinitely loop, and
+ // head towards close() ourselves.
+ break;
+ }
+
+ // If we're here, then we've received a Message_Block that was
+ // not informing us to quit, so we're assuming it's a valid
+ // meaningful Block.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask));
+
+ // We grab the read-pointer from the Block, and display it through a DEBUG statement.
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() ));
+
+ // We pretend that this takes to time to process the Block.
+ // If you're on a fast machine, you might have to raise this
+ // value to actually witness different threads handling
+ // blocks for each Task.
+
+ ACE_OS::sleep (ACE_Time_Value (0, 250));
+
+ // Since we're part of a Stream, we duplicate the Block, and
+ // send it on to the next Task.
+
+ if (put_next(messageBlock->duplicate()) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1);
+ }
+
+ // And then we release our copy of it.
+
+ messageBlock->release();
+
+ }
+
+ return 0;
+
+}
+
+
+const char * Task::nameOfTask(void) const
+{
+ return d_nameOfTask;
+}
+</PRE>
+<P><HR WIDTH="100%">
+<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page04.html">Continue This Tutorial</A>]</CENTER>
diff --git a/docs/tutorials/014/page04.html b/docs/tutorials/014/page04.html
new file mode 100644
index 00000000000..c173178dbc2
--- /dev/null
+++ b/docs/tutorials/014/page04.html
@@ -0,0 +1,108 @@
+<HTML>
+<HEAD>
+ <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
+ <META NAME="Author" CONTENT="Bob McWhirter">
+ <TITLE>ACE Tutorial 014</TITLE>
+</HEAD>
+<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">
+
+<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>
+
+<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>
+
+<P>
+<HR WIDTH="100%">
+<P>
+As stated in the comments below, the default action of the task at the
+ stream tail is to treat any received data as an error. In our
+ implementation it will often happen that data gets through to
+ the tail. How, then, do we handle this without creating an
+ error condition? Simple: Create a custom Task for use as the
+ stream tail that doesn't consider it an error to receive data.
+<P>
+Read on...
+<P>
+<HR WIDTH="100%"><PRE>
+
+// $Id$
+
+// EndTask.h
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#ifndef ENDTASK_H
+#define ENDTASK_H
+
+#include "Task.h"
+
+// When you setup a Stream and push your modules on,
+// there are two additional modules that go unseen
+// by the user.
+//
+// The Stream pushes on a Stream_Head in front of
+// your first module, and a Stream_Tail behind your
+// last module.
+//
+// If your put() a message to the Stream Tail, it
+// assumes you did so in error. This simple EndTask
+// class allows you to push a message to it and just
+// have it safely Go Away.
+//
+// All this Task does is release the Message_Block
+// and return 0. It's a suitable black-hole.
+
+
+class EndTask : public Task
+{
+
+public:
+
+ typedef Task inherited;
+
+ EndTask(const char *nameOfTask) :
+ inherited(nameOfTask, 0) {
+
+ // when we get open()'d, it with 0 threads
+ // since there is actually no processing to do.
+
+ cerr &lt;&lt; __LINE__ &lt;&lt; " " &lt;&lt; __FILE__ &lt;&lt; endl;
+ };
+
+ virtual int open(void *)
+ {
+ cerr &lt;&lt; __LINE__ &lt;&lt; " " &lt;&lt; __FILE__ &lt;&lt; endl;
+ return 0;
+ }
+
+ virtual int open(void)
+ {
+ cerr &lt;&lt; __LINE__ &lt;&lt; " " &lt;&lt; __FILE__ &lt;&lt; endl;
+ return 0;
+ }
+
+ virtual ~EndTask(void) {
+ };
+
+ virtual int put(ACE_Message_Block *message,
+ ACE_Time_Value *timeout) {
+
+ cerr &lt;&lt; __LINE__ &lt;&lt; " " &lt;&lt; __FILE__ &lt;&lt; endl;
+ ACE_UNUSED_ARG(timeout);
+
+ // we don't have anything to do, so
+ // release() the message.
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s EndTask::put() -- releasing Message_Block\n", this->nameOfTask()));
+ message->release();
+ return 0;
+ };
+
+};
+
+#endif // ENDTASK_H
+</PRE>
+<P><HR WIDTH="100%">
+<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page05.html">Continue This Tutorial</A>]</CENTER>
diff --git a/docs/tutorials/014/page05.html b/docs/tutorials/014/page05.html
new file mode 100644
index 00000000000..c654dd69a83
--- /dev/null
+++ b/docs/tutorials/014/page05.html
@@ -0,0 +1,215 @@
+<HTML>
+<HEAD>
+ <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
+ <META NAME="Author" CONTENT="Bob McWhirter">
+ <TITLE>ACE Tutorial 014</TITLE>
+</HEAD>
+<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">
+
+<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>
+
+<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>
+
+<P>
+<HR WIDTH="100%">
+<P>
+Now we come to main(). In the previous task-chain tutorial
+ every thread pool had to have the same number of threads. This
+ time around, we leverage the construction method of ACE_Stream
+ and ACE_Module to customize the thread-pool size in each
+ ACE_Task of the stream.
+<P>
+Remember EndTask from the previous page? We create one here and push
+ it into the stream to take care of cleaning up the messages.
+ Technically, we could have replaced the default Tail task
+ created by the ACE framework but it seems to make more sense to
+ just push our "tail" onto the stream like the other tasks. The
+ caveat to this method is that you must be sure you don't push()
+ any other Modules behind the EndTask!
+<P>
+Once the stream of modules containing tasks is all setup then we can
+ put() some data into the stream for processing. The clever use
+ of Task::close() makes shutting downt the stream easier than
+ ever. No messing with hangup messages at the application level,
+ just close() when you're done! What could be simpler?
+<P>
+<HR WIDTH="100%">
+<PRE>
+
+// $Id$
+
+// stream.cxx
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#include "Task.h"
+#include "EndTask.h"
+// This is our specialized ACE_Task.
+
+#include &lt;ace/Module.h>
+#include &lt;ace/Stream.h>
+// These are the neccessary ACE headers.
+
+
+typedef ACE_Module&lt;ACE_MT_SYNCH> Module;
+typedef ACE_Stream&lt;ACE_MT_SYNCH> Stream;
+// Just to avoid a lot of typing, typedefs
+// are generally a good idea.
+
+int main(int argc, char *argv[])
+{
+ cerr &lt;&lt; __LINE__ &lt;&lt; endl;
+
+ int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3;
+ // unless otherwise specified, just send three messages
+ // down the stream.
+
+ Stream theStream;
+ // the ACE_Stream itself.
+
+ cerr &lt;&lt; __LINE__ &lt;&lt; endl;
+
+ // Now, we instantiate 4 different Tasks. These do not
+ // need to be all the same class, but they do need to
+ // all derrive from the same flavor of ACE_Task.
+ //
+ // Also, we instantiate a fifth end-cap Task to clean
+ // up Message_Blocks as they reach the end.
+
+ Task *taskOne;
+ Task *taskTwo;
+ Task *taskThree;
+ Task *taskFour;
+ Task *taskEnd;
+
+ // Out Task's take two arguments: a name, and the number
+ // of threads to dedicate to the task.
+
+ taskOne = new Task("Task No. 1", 1);
+ taskTwo = new Task("Task No. 2", 3);
+ taskThree = new Task("Task No. 3", 7);
+ taskFour = new Task("Task No. 4", 1);
+
+ // Our EndTask only takes 1 argument, as it actually
+ // doesn't spawn any threads for processing.
+
+ taskEnd = new EndTask("End Task");
+
+ Module *moduleOne;
+ Module *moduleTwo;
+ Module *moduleThree;
+ Module *moduleFour;
+ Module *moduleEnd;
+
+ // ACE_Stream accepts ACE_Modules, which are simply a pair of
+ // ACE_Tasks. One is dedicated for writing, while the other
+ // is dedicated to reading. Think of the writing side as
+ // downstream, and the reading side as upstream.
+ //
+ // We're only working with a unidirection Stream today,
+ // so we'll only actually install a Task into the write
+ // side of the module, effectively downstream.
+
+ cerr &lt;&lt; __LINE__ &lt;&lt; endl;
+ moduleOne = new Module("Module No. 1", taskOne);
+ moduleTwo = new Module("Module No. 2", taskTwo);
+ moduleThree = new Module("Module No. 3", taskThree);
+ moduleFour = new Module("Module No. 4", taskFour);
+ moduleEnd = new Module("Module End", taskEnd);
+
+ cerr &lt;&lt; __LINE__ &lt;&lt; endl;
+ // Now we push the Modules onto the Stream.
+ // Pushing adds the module to the head, or
+ // otherwise prepends it to whatever modules
+ // are already installed.
+
+ // So, you need to push() the modules on -backwards-
+ // from our viewpoint.
+
+ if (theStream.push(moduleEnd) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ cerr &lt;&lt; __LINE__ &lt;&lt; endl;
+ if (theStream.push(moduleFour) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ // As we push a Module onto the Stream, it gets opened.
+ // When a Module open()s, it opens the Tasks that it contains.
+ //
+ // Since we cannot provide an argument to this embedded
+ // call to open(), we supplied specified the number of
+ // threads in the constructor of our Tasks.
+
+ if (theStream.push(moduleThree) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ if (theStream.push(moduleTwo) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ if (theStream.push(moduleOne) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ cerr &lt;&lt; __LINE__ &lt;&lt; endl;
+ // Now that the Modules are open, the Tasks threads should
+ // be launching and entering their svc() loop, so we send
+ // some messages down the Stream.
+
+ int sent = 1;
+
+ ACE_Message_Block *message;
+
+ while (sent &lt;= numberOfMessages) {
+
+ // First, create ourselves a Message_Block.
+ // see Tutorials 10-13 for more information
+ // about Message_Blocks and Message_Queues.
+
+ message = new ACE_Message_Block(128);
+
+ // Now, we grab the write-pointer from the Block,
+ // and sprintf() our text into it.
+
+ ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent);
+
+ // All we have to do now is drop the Message_Block
+ // into the Stream.
+
+ // It is always a good idea to duplicate() a Message_Block
+ // when you put it into any Message_Queue, as then
+ // you can always be allowed to release() your copy
+ // without worry.
+
+ if (theStream.put(message->duplicate(), 0) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1);
+ }
+
+ message->release();
+ ++sent;
+ }
+
+ // Now that we've sent our Message_Blocks, close down
+ // the Stream.
+ //
+ // The Stream will automagically delete the Modules and
+ // the contained Tasks. We don't have to do that.
+ //
+ // This call will block (due to the way we've written our
+ // Task class) until all Message_Blocks have cleared the
+ // entire Stream, and all associated threads have exited.
+
+ theStream.close();
+
+ return 0;
+}
+</PRE>
+<P><HR WIDTH="100%">
+<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page06.html">Continue This Tutorial</A>]</CENTER>
diff --git a/docs/tutorials/014/page06.html b/docs/tutorials/014/page06.html
new file mode 100644
index 00000000000..01e5fef889c
--- /dev/null
+++ b/docs/tutorials/014/page06.html
@@ -0,0 +1,28 @@
+<HTML>
+<HEAD>
+ <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
+ <META NAME="Author" CONTENT="Bob McWhirter">
+ <TITLE>ACE Tutorial 014</TITLE>
+</HEAD>
+<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">
+
+<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>
+
+<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>
+
+<P>
+<HR WIDTH="100%">
+
+Ok, so that's the Stream tutorial. As you can see, it's much simpler
+ than the task-chain developed last time but at the same time it
+ is also much more extensible.
+
+<UL>
+<LI><A HREF="Makefile">Makefile</A>
+<LI><A HREF="Task.h">Task.h</A>
+<LI><A HREF="Task.cpp">Task.cpp</A>
+<LI><A HREF="EndTask.h">EndTask.h</A>
+<LI><A HREF="stream.cpp">stream.cpp</A>
+</UL>
+<P><HR WIDTH="100%">
+<CENTER>[<A HREF="..">Tutorial Index</A>] </CENTER>
diff --git a/docs/tutorials/014/stream.cpp b/docs/tutorials/014/stream.cpp
new file mode 100644
index 00000000000..4925bd5cf10
--- /dev/null
+++ b/docs/tutorials/014/stream.cpp
@@ -0,0 +1,175 @@
+
+// $Id$
+
+// stream.cxx
+//
+// Tutorial regarding a way to use ACE_Stream.
+//
+// written by bob mcwhirter (bob@netwrench.com)
+//
+//
+
+#include "Task.h"
+#include "EndTask.h"
+// This is our specialized ACE_Task.
+
+#include <ace/Module.h>
+#include <ace/Stream.h>
+// These are the neccessary ACE headers.
+
+
+typedef ACE_Module<ACE_MT_SYNCH> Module;
+typedef ACE_Stream<ACE_MT_SYNCH> Stream;
+// Just to avoid a lot of typing, typedefs
+// are generally a good idea.
+
+int main(int argc, char *argv[])
+{
+ cerr << __LINE__ << endl;
+
+ int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3;
+ // unless otherwise specified, just send three messages
+ // down the stream.
+
+ Stream theStream;
+ // the ACE_Stream itself.
+
+ cerr << __LINE__ << endl;
+
+ // Now, we instantiate 4 different Tasks. These do not
+ // need to be all the same class, but they do need to
+ // all derrive from the same flavor of ACE_Task.
+ //
+ // Also, we instantiate a fifth end-cap Task to clean
+ // up Message_Blocks as they reach the end.
+
+ Task *taskOne;
+ Task *taskTwo;
+ Task *taskThree;
+ Task *taskFour;
+ Task *taskEnd;
+
+ // Out Task's take two arguments: a name, and the number
+ // of threads to dedicate to the task.
+
+ taskOne = new Task("Task No. 1", 1);
+ taskTwo = new Task("Task No. 2", 3);
+ taskThree = new Task("Task No. 3", 7);
+ taskFour = new Task("Task No. 4", 1);
+
+ // Our EndTask only takes 1 argument, as it actually
+ // doesn't spawn any threads for processing.
+
+ taskEnd = new EndTask("End Task");
+
+ Module *moduleOne;
+ Module *moduleTwo;
+ Module *moduleThree;
+ Module *moduleFour;
+ Module *moduleEnd;
+
+ // ACE_Stream accepts ACE_Modules, which are simply a pair of
+ // ACE_Tasks. One is dedicated for writing, while the other
+ // is dedicated to reading. Think of the writing side as
+ // downstream, and the reading side as upstream.
+ //
+ // We're only working with a unidirection Stream today,
+ // so we'll only actually install a Task into the write
+ // side of the module, effectively downstream.
+
+ cerr << __LINE__ << endl;
+ moduleOne = new Module("Module No. 1", taskOne);
+ moduleTwo = new Module("Module No. 2", taskTwo);
+ moduleThree = new Module("Module No. 3", taskThree);
+ moduleFour = new Module("Module No. 4", taskFour);
+ moduleEnd = new Module("Module End", taskEnd);
+
+ cerr << __LINE__ << endl;
+ // Now we push the Modules onto the Stream.
+ // Pushing adds the module to the head, or
+ // otherwise prepends it to whatever modules
+ // are already installed.
+
+ // So, you need to push() the modules on -backwards-
+ // from our viewpoint.
+
+ if (theStream.push(moduleEnd) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ cerr << __LINE__ << endl;
+ if (theStream.push(moduleFour) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ // As we push a Module onto the Stream, it gets opened.
+ // When a Module open()s, it opens the Tasks that it contains.
+ //
+ // Since we cannot provide an argument to this embedded
+ // call to open(), we supplied specified the number of
+ // threads in the constructor of our Tasks.
+
+ if (theStream.push(moduleThree) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ if (theStream.push(moduleTwo) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ if (theStream.push(moduleOne) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
+ }
+
+ cerr << __LINE__ << endl;
+ // Now that the Modules are open, the Tasks threads should
+ // be launching and entering their svc() loop, so we send
+ // some messages down the Stream.
+
+ int sent = 1;
+
+ ACE_Message_Block *message;
+
+ while (sent <= numberOfMessages) {
+
+ // First, create ourselves a Message_Block.
+ // see Tutorials 10-13 for more information
+ // about Message_Blocks and Message_Queues.
+
+ message = new ACE_Message_Block(128);
+
+ // Now, we grab the write-pointer from the Block,
+ // and sprintf() our text into it.
+
+ ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent);
+
+ // All we have to do now is drop the Message_Block
+ // into the Stream.
+
+ // It is always a good idea to duplicate() a Message_Block
+ // when you put it into any Message_Queue, as then
+ // you can always be allowed to release() your copy
+ // without worry.
+
+ if (theStream.put(message->duplicate(), 0) == -1) {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1);
+ }
+
+ message->release();
+ ++sent;
+ }
+
+ // Now that we've sent our Message_Blocks, close down
+ // the Stream.
+ //
+ // The Stream will automagically delete the Modules and
+ // the contained Tasks. We don't have to do that.
+ //
+ // This call will block (due to the way we've written our
+ // Task class) until all Message_Blocks have cleared the
+ // entire Stream, and all associated threads have exited.
+
+ theStream.close();
+
+ return 0;
+}