diff options
author | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-10-12 23:22:26 +0000 |
---|---|---|
committer | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-10-12 23:22:26 +0000 |
commit | ebc019e4ce13412acb20d929d13899ffa1730e5e (patch) | |
tree | c2a4c8395342cc7b1e71b0830e110d068153e4fb | |
parent | 1568065d7af1935a52db8f0d01ece914ebb4b8d8 (diff) | |
download | ATCD-ebc019e4ce13412acb20d929d13899ffa1730e5e.tar.gz |
*** empty log message ***
-rw-r--r-- | ChangeLog-98b | 12 | ||||
-rw-r--r-- | docs/ACE-tutorials.html | 3 | ||||
-rw-r--r-- | docs/tutorials/014/EndTask.h | 80 | ||||
-rw-r--r-- | docs/tutorials/014/Makefile | 69 | ||||
-rw-r--r-- | docs/tutorials/014/Task.cpp | 211 | ||||
-rw-r--r-- | docs/tutorials/014/Task.h | 68 | ||||
-rw-r--r-- | docs/tutorials/014/page01.html | 38 | ||||
-rw-r--r-- | docs/tutorials/014/page02.html | 91 | ||||
-rw-r--r-- | docs/tutorials/014/page03.html | 243 | ||||
-rw-r--r-- | docs/tutorials/014/page04.html | 108 | ||||
-rw-r--r-- | docs/tutorials/014/page05.html | 215 | ||||
-rw-r--r-- | docs/tutorials/014/page06.html | 28 | ||||
-rw-r--r-- | docs/tutorials/014/stream.cpp | 175 |
13 files changed, 1341 insertions, 0 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b index f4b90843f5c..ffd5ed4db63 100644 --- a/ChangeLog-98b +++ b/ChangeLog-98b @@ -1,3 +1,15 @@ +Mon Oct 12 19:19:27 1998 James CE Johnson <jcej@chiroptera.tragus.org> + + * docs/ACE-tutorials.html: + Added a link to Bob McWhirter's Frequently Made Mistakes page. + + * docs/tutorials/014/Makefile + * docs/tutorials/014/Task.{cpp|h} + * docs/tutorials/014/EndTask.h + * docs/tutorials/014/stream.cpp + * docs/tutorials/014/page0[123456].html + A slightly modified version of Bob McWhirter's ACE_Stream tutorial. + Mon Oct 12 14:39:15 1998 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * examples/ASX/CCM_App/CCM_App.cpp: We need to make sure and use diff --git a/docs/ACE-tutorials.html b/docs/ACE-tutorials.html index adc584e2254..73c3af3ca99 100644 --- a/docs/ACE-tutorials.html +++ b/docs/ACE-tutorials.html @@ -26,6 +26,9 @@ links provide further information on this topic. <P> <DT> <img alt="o" src="http://www.cs.wustl.edu/~schmidt/gifs/misc/redball.gif"> <A HREF="http://www.cs.wustl.edu/~schmidt/ACE_wrappers/docs/tutorials/new-tutorials.html">Developing New Tutorials</A> + <DT> <img alt="o" src="http://www.cs.wustl.edu/~schmidt/gifs/misc/redball.gif"> <A + HREF="http://dox.netwrench.com/acedox/">Frequently Made Mistakes (FMM)</A> + </DL> </TD> diff --git a/docs/tutorials/014/EndTask.h b/docs/tutorials/014/EndTask.h new file mode 100644 index 00000000000..e4ab823f1f9 --- /dev/null +++ b/docs/tutorials/014/EndTask.h @@ -0,0 +1,80 @@ + +// $Id$ + +// EndTask.h +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#ifndef ENDTASK_H +#define ENDTASK_H + +#include "Task.h" + +// When you setup a Stream and push your modules on, +// there are two additional modules that go unseen +// by the user. +// +// The Stream pushes on a Stream_Head in front of +// your first module, and a Stream_Tail behind your +// last module. +// +// If your put() a message to the Stream Tail, it +// assumes you did so in error. This simple EndTask +// class allows you to push a message to it and just +// have it safely Go Away. +// +// All this Task does is release the Message_Block +// and return 0. It's a suitable black-hole. + + +class EndTask : public Task +{ + +public: + + typedef Task inherited; + + EndTask(const char *nameOfTask) : + inherited(nameOfTask, 0) { + + // when we get open()'d, it with 0 threads + // since there is actually no processing to do. + + cerr << __LINE__ << " " << __FILE__ << endl; + }; + + virtual int open(void *) + { + cerr << __LINE__ << " " << __FILE__ << endl; + return 0; + } + + virtual int open(void) + { + cerr << __LINE__ << " " << __FILE__ << endl; + return 0; + } + + virtual ~EndTask(void) { + }; + + virtual int put(ACE_Message_Block *message, + ACE_Time_Value *timeout) { + + cerr << __LINE__ << " " << __FILE__ << endl; + ACE_UNUSED_ARG(timeout); + + // we don't have anything to do, so + // release() the message. + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s EndTask::put() -- releasing Message_Block\n", this->nameOfTask())); + message->release(); + return 0; + }; + +}; + +#endif // ENDTASK_H diff --git a/docs/tutorials/014/Makefile b/docs/tutorials/014/Makefile new file mode 100644 index 00000000000..5a8d8891382 --- /dev/null +++ b/docs/tutorials/014/Makefile @@ -0,0 +1,69 @@ + +# $Id$ + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = stream + +FILES = Task + +BUILD = $(VBIN) + +SRC = $(addsuffix .cpp,$(BIN)) +SRC += $(addsuffix .cpp,$(FILES)) + +HDR = *.h + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU +include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +HTML : # + ../combine *.html + +rename : # + for i in *.cxx ; do \ + n=`expr "$$i" : "\(.*\).cxx"` ;\ + mv $$i $$n.cpp ;\ + done + +Indent : # + for i in $(SRC) $(HDR) ; do \ + indent -npsl -l80 -fca -fc1 -cli0 -cdb -ts2 -bl -bli0 < $$i | \ + sed -e 's/: :/::/g' \ + -e 's/^.*\(public:\)/\1/' \ + -e 's/^.*\(protected:\)/\1/' \ + -e 's/^.*\(private:\)/\1/' \ + -e 's/:\(public\)/ : \1/' \ + -e 's/:\(protected\)/ : \1/' \ + -e 's/:\(private\)/ : \1/' \ + -e 's/ / /g' \ + > $$i~ ;\ + mv $$i~ $$i ;\ + done + +Depend : depend + perl ../007/fix.Makefile + +.depend : # + touch .depend + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +include .depend diff --git a/docs/tutorials/014/Task.cpp b/docs/tutorials/014/Task.cpp new file mode 100644 index 00000000000..4dc77b6e03b --- /dev/null +++ b/docs/tutorials/014/Task.cpp @@ -0,0 +1,211 @@ + +// $Id$ + +// Task.cxx +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#include <ace/Message_Block.h> + +#include "Task.h" + +Task::Task(const char * nameOfTask, + int numberOfThreads) + : d_numberOfThreads(numberOfThreads), + d_barrier(numberOfThreads) +{ + // Just initialize our name, number of threads, and barrier. + + ACE_OS::strcpy(d_nameOfTask, nameOfTask); +} + +Task::~Task(void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask)); +} + +int Task::open(void *arg) +{ + ACE_UNUSED_ARG(arg); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask)); + + // call ACE_Task::activate() to spawn the threads using + // our Task::svc() as the function to be run. + + // FMM -- Frequently Made Mistake -- + // + // If you specify the flag THR_DETACHED when activating the + // Task, you will get an assert() violation during close(), + // since the Task waits for all of its threads to rejoin. + // + + return this->activate(THR_NEW_LWP, + d_numberOfThreads); +} + +int Task::put(ACE_Message_Block *message, + ACE_Time_Value *timeout) +{ + // ACE_Stream uses the put() method of Tasks to send messages. + // This defaultly does nothing. Here we link our put() method + // directly to our putq() method, so that Messages put() to us + // will appear in the Message_Queue that is checked by the + // service threads. + + return this->putq(message, timeout); +} + +int Task::close(u_long flags) +{ + + // When the Stream closes the Module, the Module then close()'s the Task + // and passing a value of (1) as the flag. + + // When a service thread exits, it calls close() with a value that is not + // (1). + + // We use this fact to tell the difference between closing a service thread, + // and closing the main Task itself. + + if (flags == 1) { + + // The Module has asked to close the main Task. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask)); + + // We create a Message_Block... + + ACE_Message_Block *hangupBlock = new ACE_Message_Block(); + + // And make it of the type MB_HANGUP. + + hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP); + + // We then send this Block into the Message_Queue to be seen by the + // service threads. + + // Once again we duplicate() the Block as send it off... + + if (this->putq(hangupBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1); + } + + // ..and we're free to release() our copy of it. + + hangupBlock->release(); + + // Now, all we have to do is wait() for the service threads to all + // exit. This is where using THR_DETACHED in the activate() method + // will come back to haunt you. + + // The Stream waits until this returns before attempting to remove + // the next Module/Task group in the Stream. This allows for an + // orderly shutting down of the Stream. + + return this->wait(); + + + } else { + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask)); + + // This is where we can clean up any mess left over by each service thread. + // In this Task, there is nothing to do. + + } + + return 0; + +} + +int Task::svc(void) +{ + + // This is the function that our service threads run once they are spawned. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask)); + + // First, we wait until all of our peer service threads have arrived + // at this point also. + + d_barrier.wait(); + + ACE_Message_Block *messageBlock; + + while (1) { + + // And now we loop almost infinitely. + + // getq() will block until a Message_Block is available to be read, + // or an error occurs. + + if ( this->getq(messageBlock, 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1); + } + + if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) { + + // If the Message_Block is of type MB_HANGUP, then we're being asked + // to shut down nicely. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask)); + + // So, we duplicate the Block, and put it back into the Message_Queue, + // in case there are some more peer service threads still running. + + if (this->putq(messageBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1); + } + + // We release our copy of the Block. + messageBlock->release(); + + // And we break out of the nearly infinitely loop, and + // head towards close() ourselves. + break; + } + + // If we're here, then we've received a Message_Block that was + // not informing us to quit, so we're assuming it's a valid + // meaningful Block. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask)); + + // We grab the read-pointer from the Block, and display it through a DEBUG statement. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() )); + + // We pretend that this takes to time to process the Block. + // If you're on a fast machine, you might have to raise this + // value to actually witness different threads handling + // blocks for each Task. + + ACE_OS::sleep (ACE_Time_Value (0, 250)); + + // Since we're part of a Stream, we duplicate the Block, and + // send it on to the next Task. + + if (put_next(messageBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1); + } + + // And then we release our copy of it. + + messageBlock->release(); + + } + + return 0; + +} + + +const char * Task::nameOfTask(void) const +{ + return d_nameOfTask; +} diff --git a/docs/tutorials/014/Task.h b/docs/tutorials/014/Task.h new file mode 100644 index 00000000000..e4797f49892 --- /dev/null +++ b/docs/tutorials/014/Task.h @@ -0,0 +1,68 @@ + +// $Id$ + +// Task.h +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#ifndef TASK_H +#define TASK_H + +#include <ace/Task.h> +#include <ace/Synch.h> + +// Always typedef when possible. + +typedef ACE_Task<ACE_MT_SYNCH> Task_Base; + +class Task : public Task_Base +{ + +public: + + typedef Task_Base inherited; + // This is just good form. + + Task(const char *nameOfTask, + int numberOfThreads); + // Initialize our Task with a name, + // and number of threads to spawn. + + virtual ~Task(void); + + virtual int open(void *arg); + // This is provided to prevent compiler complaints + // about hidden virtual functions. + + virtual int close(u_long flags); + // This closes down the Task and all service threads. + + virtual int put(ACE_Message_Block *message, + ACE_Time_Value *timeout); + // This is the interface that ACE_Stream uses to + // communicate with our Task. + + virtual int svc(void); + // This is the actual service loop each of the service + // threads iterates through. + + const char *nameOfTask(void) const; + // Returns the name of this Task. + +private: + + int d_numberOfThreads; + char d_nameOfTask[64]; + + ACE_Barrier d_barrier; + // Simple Barrier to make sure all of our service + // threads have entered their loop before accepting + // any messages. +}; + + +#endif // TASK_H diff --git a/docs/tutorials/014/page01.html b/docs/tutorials/014/page01.html new file mode 100644 index 00000000000..ac4a0ea6d65 --- /dev/null +++ b/docs/tutorials/014/page01.html @@ -0,0 +1,38 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="Bob McWhirter"> + <TITLE>ACE Tutorial 014</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER> + +<P> +<HR WIDTH="100%"> + +<p><b>ACE_Stream</b> is handy when you have several <b>ACE_Task</b> objects +that you would like to link together. + +<p>An intermediate class you we will deal with is the <b>ACE_Module</b>. + +<p>The basic plan is to wrap your <b>Task</b> into a <b>Module</b>, push +the <b>Module</b> onto the <b>Stream</b>. Do this for each <b>Task</b>, + and then inject <b>Message_Block</b>s into the <b>Stream</b>. + +<p>Each <b>Task</b> then processes the <b>Message_Block</b>, and forwards +it on to the next <b>Task</b> in the <b>Stream</b>. + +<p>If you are not already familiar with <b>Message_Block</b>s and <b>Message_Queue</b>s, +I highly suggest that you check out <A HREF="../#MQ">Tutorials 10-13</A>. + +<p>Streams can be used for both downstream and upstream movement of messages. Used +this way mirrors closely the way System V STREAMS work. But you don't have to use them +bidirectionally. In this tutorial, we only use one direction of the Stream. Down. + +<p>This tutorial is contributed by Bob McWhirter (bob@netwrench.com) +<P> +<P><HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page02.html">Continue This Tutorial</A>]</CENTER> diff --git a/docs/tutorials/014/page02.html b/docs/tutorials/014/page02.html new file mode 100644 index 00000000000..9dd05c3a0b5 --- /dev/null +++ b/docs/tutorials/014/page02.html @@ -0,0 +1,91 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="Bob McWhirter"> + <TITLE>ACE Tutorial 014</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER> + +<P> +<HR WIDTH="100%"> +<P> +You find pretty soon that anytime you work with ACE_Task<> you + have to create a derivative. The Task.h header simply provides + that derivative with the overrides we'll need in our application. +<P> +<HR WIDTH="100%"><PRE> + +// $Id$ + +// Task.h +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#ifndef TASK_H +#define TASK_H + +#include <ace/Task.h> +#include <ace/Synch.h> + +// Always typedef when possible. + +typedef ACE_Task<ACE_MT_SYNCH> Task_Base; + +class Task : public Task_Base +{ + +public: + + typedef Task_Base inherited; + // This is just good form. + + Task(const char *nameOfTask, + int numberOfThreads); + // Initialize our Task with a name, + // and number of threads to spawn. + + virtual ~Task(void); + + virtual int open(void *arg); + // This is provided to prevent compiler complaints + // about hidden virtual functions. + + virtual int close(u_long flags); + // This closes down the Task and all service threads. + + virtual int put(ACE_Message_Block *message, + ACE_Time_Value *timeout); + // This is the interface that ACE_Stream uses to + // communicate with our Task. + + virtual int svc(void); + // This is the actual service loop each of the service + // threads iterates through. + + const char *nameOfTask(void) const; + // Returns the name of this Task. + +private: + + int d_numberOfThreads; + char d_nameOfTask[64]; + + ACE_Barrier d_barrier; + // Simple Barrier to make sure all of our service + // threads have entered their loop before accepting + // any messages. +}; + + +#endif // TASK_H +</PRE> +<P><HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page03.html">Continue This Tutorial</A>]</CENTER> diff --git a/docs/tutorials/014/page03.html b/docs/tutorials/014/page03.html new file mode 100644 index 00000000000..a0edf26f328 --- /dev/null +++ b/docs/tutorials/014/page03.html @@ -0,0 +1,243 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="Bob McWhirter"> + <TITLE>ACE Tutorial 014</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER> + +<P> +<HR WIDTH="100%"> +<P> +Before we get to main() let's take a look at the Task implementation. + While we've overridden several methods, the real work is done in + the close() and svc() methods. +<P> +Notice how close() figures out if it is being called by the shutdown + of the ACE_Stream or by the exit of svc(). The magic here is + provided by the <i>flags</i> parameter. By handling the stream + shutdown in this way, we don't have to do anything strange in + svc(). We also don't end up with extra hangup messages in the + queue when the dust all settles down. +<P> +Like our other tutorials, svc() looks for a hangup and processes data. +<P> +<HR WIDTH="100%"><PRE> + +// $Id$ + +// Task.cxx +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#include <ace/Message_Block.h> + +#include "Task.h" + +Task::Task(const char * nameOfTask, + int numberOfThreads) + : d_numberOfThreads(numberOfThreads), + d_barrier(numberOfThreads) +{ + // Just initialize our name, number of threads, and barrier. + + ACE_OS::strcpy(d_nameOfTask, nameOfTask); +} + +Task::~Task(void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask)); +} + +int Task::open(void *arg) +{ + ACE_UNUSED_ARG(arg); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask)); + + // call ACE_Task::activate() to spawn the threads using + // our Task::svc() as the function to be run. + + // FMM -- Frequently Made Mistake -- + // + // If you specify the flag THR_DETACHED when activating the + // Task, you will get an assert() violation during close(), + // since the Task waits for all of its threads to rejoin. + // + + return this->activate(THR_NEW_LWP, + d_numberOfThreads); +} + +int Task::put(ACE_Message_Block *message, + ACE_Time_Value *timeout) +{ + // ACE_Stream uses the put() method of Tasks to send messages. + // This defaultly does nothing. Here we link our put() method + // directly to our putq() method, so that Messages put() to us + // will appear in the Message_Queue that is checked by the + // service threads. + + return this->putq(message, timeout); +} + +int Task::close(u_long flags) +{ + + // When the Stream closes the Module, the Module then close()'s the Task + // and passing a value of (1) as the flag. + + // When a service thread exits, it calls close() with a value that is not + // (1). + + // We use this fact to tell the difference between closing a service thread, + // and closing the main Task itself. + + if (flags == 1) { + + // The Module has asked to close the main Task. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask)); + + // We create a Message_Block... + + ACE_Message_Block *hangupBlock = new ACE_Message_Block(); + + // And make it of the type MB_HANGUP. + + hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP); + + // We then send this Block into the Message_Queue to be seen by the + // service threads. + + // Once again we duplicate() the Block as send it off... + + if (this->putq(hangupBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1); + } + + // ..and we're free to release() our copy of it. + + hangupBlock->release(); + + // Now, all we have to do is wait() for the service threads to all + // exit. This is where using THR_DETACHED in the activate() method + // will come back to haunt you. + + // The Stream waits until this returns before attempting to remove + // the next Module/Task group in the Stream. This allows for an + // orderly shutting down of the Stream. + + return this->wait(); + + + } else { + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask)); + + // This is where we can clean up any mess left over by each service thread. + // In this Task, there is nothing to do. + + } + + return 0; + +} + +int Task::svc(void) +{ + + // This is the function that our service threads run once they are spawned. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask)); + + // First, we wait until all of our peer service threads have arrived + // at this point also. + + d_barrier.wait(); + + ACE_Message_Block *messageBlock; + + while (1) { + + // And now we loop almost infinitely. + + // getq() will block until a Message_Block is available to be read, + // or an error occurs. + + if ( this->getq(messageBlock, 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1); + } + + if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) { + + // If the Message_Block is of type MB_HANGUP, then we're being asked + // to shut down nicely. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask)); + + // So, we duplicate the Block, and put it back into the Message_Queue, + // in case there are some more peer service threads still running. + + if (this->putq(messageBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1); + } + + // We release our copy of the Block. + messageBlock->release(); + + // And we break out of the nearly infinitely loop, and + // head towards close() ourselves. + break; + } + + // If we're here, then we've received a Message_Block that was + // not informing us to quit, so we're assuming it's a valid + // meaningful Block. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask)); + + // We grab the read-pointer from the Block, and display it through a DEBUG statement. + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() )); + + // We pretend that this takes to time to process the Block. + // If you're on a fast machine, you might have to raise this + // value to actually witness different threads handling + // blocks for each Task. + + ACE_OS::sleep (ACE_Time_Value (0, 250)); + + // Since we're part of a Stream, we duplicate the Block, and + // send it on to the next Task. + + if (put_next(messageBlock->duplicate()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1); + } + + // And then we release our copy of it. + + messageBlock->release(); + + } + + return 0; + +} + + +const char * Task::nameOfTask(void) const +{ + return d_nameOfTask; +} +</PRE> +<P><HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page04.html">Continue This Tutorial</A>]</CENTER> diff --git a/docs/tutorials/014/page04.html b/docs/tutorials/014/page04.html new file mode 100644 index 00000000000..c173178dbc2 --- /dev/null +++ b/docs/tutorials/014/page04.html @@ -0,0 +1,108 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="Bob McWhirter"> + <TITLE>ACE Tutorial 014</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER> + +<P> +<HR WIDTH="100%"> +<P> +As stated in the comments below, the default action of the task at the + stream tail is to treat any received data as an error. In our + implementation it will often happen that data gets through to + the tail. How, then, do we handle this without creating an + error condition? Simple: Create a custom Task for use as the + stream tail that doesn't consider it an error to receive data. +<P> +Read on... +<P> +<HR WIDTH="100%"><PRE> + +// $Id$ + +// EndTask.h +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#ifndef ENDTASK_H +#define ENDTASK_H + +#include "Task.h" + +// When you setup a Stream and push your modules on, +// there are two additional modules that go unseen +// by the user. +// +// The Stream pushes on a Stream_Head in front of +// your first module, and a Stream_Tail behind your +// last module. +// +// If your put() a message to the Stream Tail, it +// assumes you did so in error. This simple EndTask +// class allows you to push a message to it and just +// have it safely Go Away. +// +// All this Task does is release the Message_Block +// and return 0. It's a suitable black-hole. + + +class EndTask : public Task +{ + +public: + + typedef Task inherited; + + EndTask(const char *nameOfTask) : + inherited(nameOfTask, 0) { + + // when we get open()'d, it with 0 threads + // since there is actually no processing to do. + + cerr << __LINE__ << " " << __FILE__ << endl; + }; + + virtual int open(void *) + { + cerr << __LINE__ << " " << __FILE__ << endl; + return 0; + } + + virtual int open(void) + { + cerr << __LINE__ << " " << __FILE__ << endl; + return 0; + } + + virtual ~EndTask(void) { + }; + + virtual int put(ACE_Message_Block *message, + ACE_Time_Value *timeout) { + + cerr << __LINE__ << " " << __FILE__ << endl; + ACE_UNUSED_ARG(timeout); + + // we don't have anything to do, so + // release() the message. + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s EndTask::put() -- releasing Message_Block\n", this->nameOfTask())); + message->release(); + return 0; + }; + +}; + +#endif // ENDTASK_H +</PRE> +<P><HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page05.html">Continue This Tutorial</A>]</CENTER> diff --git a/docs/tutorials/014/page05.html b/docs/tutorials/014/page05.html new file mode 100644 index 00000000000..c654dd69a83 --- /dev/null +++ b/docs/tutorials/014/page05.html @@ -0,0 +1,215 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="Bob McWhirter"> + <TITLE>ACE Tutorial 014</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER> + +<P> +<HR WIDTH="100%"> +<P> +Now we come to main(). In the previous task-chain tutorial + every thread pool had to have the same number of threads. This + time around, we leverage the construction method of ACE_Stream + and ACE_Module to customize the thread-pool size in each + ACE_Task of the stream. +<P> +Remember EndTask from the previous page? We create one here and push + it into the stream to take care of cleaning up the messages. + Technically, we could have replaced the default Tail task + created by the ACE framework but it seems to make more sense to + just push our "tail" onto the stream like the other tasks. The + caveat to this method is that you must be sure you don't push() + any other Modules behind the EndTask! +<P> +Once the stream of modules containing tasks is all setup then we can + put() some data into the stream for processing. The clever use + of Task::close() makes shutting downt the stream easier than + ever. No messing with hangup messages at the application level, + just close() when you're done! What could be simpler? +<P> +<HR WIDTH="100%"> +<PRE> + +// $Id$ + +// stream.cxx +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#include "Task.h" +#include "EndTask.h" +// This is our specialized ACE_Task. + +#include <ace/Module.h> +#include <ace/Stream.h> +// These are the neccessary ACE headers. + + +typedef ACE_Module<ACE_MT_SYNCH> Module; +typedef ACE_Stream<ACE_MT_SYNCH> Stream; +// Just to avoid a lot of typing, typedefs +// are generally a good idea. + +int main(int argc, char *argv[]) +{ + cerr << __LINE__ << endl; + + int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3; + // unless otherwise specified, just send three messages + // down the stream. + + Stream theStream; + // the ACE_Stream itself. + + cerr << __LINE__ << endl; + + // Now, we instantiate 4 different Tasks. These do not + // need to be all the same class, but they do need to + // all derrive from the same flavor of ACE_Task. + // + // Also, we instantiate a fifth end-cap Task to clean + // up Message_Blocks as they reach the end. + + Task *taskOne; + Task *taskTwo; + Task *taskThree; + Task *taskFour; + Task *taskEnd; + + // Out Task's take two arguments: a name, and the number + // of threads to dedicate to the task. + + taskOne = new Task("Task No. 1", 1); + taskTwo = new Task("Task No. 2", 3); + taskThree = new Task("Task No. 3", 7); + taskFour = new Task("Task No. 4", 1); + + // Our EndTask only takes 1 argument, as it actually + // doesn't spawn any threads for processing. + + taskEnd = new EndTask("End Task"); + + Module *moduleOne; + Module *moduleTwo; + Module *moduleThree; + Module *moduleFour; + Module *moduleEnd; + + // ACE_Stream accepts ACE_Modules, which are simply a pair of + // ACE_Tasks. One is dedicated for writing, while the other + // is dedicated to reading. Think of the writing side as + // downstream, and the reading side as upstream. + // + // We're only working with a unidirection Stream today, + // so we'll only actually install a Task into the write + // side of the module, effectively downstream. + + cerr << __LINE__ << endl; + moduleOne = new Module("Module No. 1", taskOne); + moduleTwo = new Module("Module No. 2", taskTwo); + moduleThree = new Module("Module No. 3", taskThree); + moduleFour = new Module("Module No. 4", taskFour); + moduleEnd = new Module("Module End", taskEnd); + + cerr << __LINE__ << endl; + // Now we push the Modules onto the Stream. + // Pushing adds the module to the head, or + // otherwise prepends it to whatever modules + // are already installed. + + // So, you need to push() the modules on -backwards- + // from our viewpoint. + + if (theStream.push(moduleEnd) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + cerr << __LINE__ << endl; + if (theStream.push(moduleFour) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + // As we push a Module onto the Stream, it gets opened. + // When a Module open()s, it opens the Tasks that it contains. + // + // Since we cannot provide an argument to this embedded + // call to open(), we supplied specified the number of + // threads in the constructor of our Tasks. + + if (theStream.push(moduleThree) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + if (theStream.push(moduleTwo) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + if (theStream.push(moduleOne) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + cerr << __LINE__ << endl; + // Now that the Modules are open, the Tasks threads should + // be launching and entering their svc() loop, so we send + // some messages down the Stream. + + int sent = 1; + + ACE_Message_Block *message; + + while (sent <= numberOfMessages) { + + // First, create ourselves a Message_Block. + // see Tutorials 10-13 for more information + // about Message_Blocks and Message_Queues. + + message = new ACE_Message_Block(128); + + // Now, we grab the write-pointer from the Block, + // and sprintf() our text into it. + + ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent); + + // All we have to do now is drop the Message_Block + // into the Stream. + + // It is always a good idea to duplicate() a Message_Block + // when you put it into any Message_Queue, as then + // you can always be allowed to release() your copy + // without worry. + + if (theStream.put(message->duplicate(), 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1); + } + + message->release(); + ++sent; + } + + // Now that we've sent our Message_Blocks, close down + // the Stream. + // + // The Stream will automagically delete the Modules and + // the contained Tasks. We don't have to do that. + // + // This call will block (due to the way we've written our + // Task class) until all Message_Blocks have cleared the + // entire Stream, and all associated threads have exited. + + theStream.close(); + + return 0; +} +</PRE> +<P><HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page06.html">Continue This Tutorial</A>]</CENTER> diff --git a/docs/tutorials/014/page06.html b/docs/tutorials/014/page06.html new file mode 100644 index 00000000000..01e5fef889c --- /dev/null +++ b/docs/tutorials/014/page06.html @@ -0,0 +1,28 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="Bob McWhirter"> + <TITLE>ACE Tutorial 014</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER> + +<P> +<HR WIDTH="100%"> + +Ok, so that's the Stream tutorial. As you can see, it's much simpler + than the task-chain developed last time but at the same time it + is also much more extensible. + +<UL> +<LI><A HREF="Makefile">Makefile</A> +<LI><A HREF="Task.h">Task.h</A> +<LI><A HREF="Task.cpp">Task.cpp</A> +<LI><A HREF="EndTask.h">EndTask.h</A> +<LI><A HREF="stream.cpp">stream.cpp</A> +</UL> +<P><HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] </CENTER> diff --git a/docs/tutorials/014/stream.cpp b/docs/tutorials/014/stream.cpp new file mode 100644 index 00000000000..4925bd5cf10 --- /dev/null +++ b/docs/tutorials/014/stream.cpp @@ -0,0 +1,175 @@ + +// $Id$ + +// stream.cxx +// +// Tutorial regarding a way to use ACE_Stream. +// +// written by bob mcwhirter (bob@netwrench.com) +// +// + +#include "Task.h" +#include "EndTask.h" +// This is our specialized ACE_Task. + +#include <ace/Module.h> +#include <ace/Stream.h> +// These are the neccessary ACE headers. + + +typedef ACE_Module<ACE_MT_SYNCH> Module; +typedef ACE_Stream<ACE_MT_SYNCH> Stream; +// Just to avoid a lot of typing, typedefs +// are generally a good idea. + +int main(int argc, char *argv[]) +{ + cerr << __LINE__ << endl; + + int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3; + // unless otherwise specified, just send three messages + // down the stream. + + Stream theStream; + // the ACE_Stream itself. + + cerr << __LINE__ << endl; + + // Now, we instantiate 4 different Tasks. These do not + // need to be all the same class, but they do need to + // all derrive from the same flavor of ACE_Task. + // + // Also, we instantiate a fifth end-cap Task to clean + // up Message_Blocks as they reach the end. + + Task *taskOne; + Task *taskTwo; + Task *taskThree; + Task *taskFour; + Task *taskEnd; + + // Out Task's take two arguments: a name, and the number + // of threads to dedicate to the task. + + taskOne = new Task("Task No. 1", 1); + taskTwo = new Task("Task No. 2", 3); + taskThree = new Task("Task No. 3", 7); + taskFour = new Task("Task No. 4", 1); + + // Our EndTask only takes 1 argument, as it actually + // doesn't spawn any threads for processing. + + taskEnd = new EndTask("End Task"); + + Module *moduleOne; + Module *moduleTwo; + Module *moduleThree; + Module *moduleFour; + Module *moduleEnd; + + // ACE_Stream accepts ACE_Modules, which are simply a pair of + // ACE_Tasks. One is dedicated for writing, while the other + // is dedicated to reading. Think of the writing side as + // downstream, and the reading side as upstream. + // + // We're only working with a unidirection Stream today, + // so we'll only actually install a Task into the write + // side of the module, effectively downstream. + + cerr << __LINE__ << endl; + moduleOne = new Module("Module No. 1", taskOne); + moduleTwo = new Module("Module No. 2", taskTwo); + moduleThree = new Module("Module No. 3", taskThree); + moduleFour = new Module("Module No. 4", taskFour); + moduleEnd = new Module("Module End", taskEnd); + + cerr << __LINE__ << endl; + // Now we push the Modules onto the Stream. + // Pushing adds the module to the head, or + // otherwise prepends it to whatever modules + // are already installed. + + // So, you need to push() the modules on -backwards- + // from our viewpoint. + + if (theStream.push(moduleEnd) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + cerr << __LINE__ << endl; + if (theStream.push(moduleFour) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + // As we push a Module onto the Stream, it gets opened. + // When a Module open()s, it opens the Tasks that it contains. + // + // Since we cannot provide an argument to this embedded + // call to open(), we supplied specified the number of + // threads in the constructor of our Tasks. + + if (theStream.push(moduleThree) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + if (theStream.push(moduleTwo) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + if (theStream.push(moduleOne) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); + } + + cerr << __LINE__ << endl; + // Now that the Modules are open, the Tasks threads should + // be launching and entering their svc() loop, so we send + // some messages down the Stream. + + int sent = 1; + + ACE_Message_Block *message; + + while (sent <= numberOfMessages) { + + // First, create ourselves a Message_Block. + // see Tutorials 10-13 for more information + // about Message_Blocks and Message_Queues. + + message = new ACE_Message_Block(128); + + // Now, we grab the write-pointer from the Block, + // and sprintf() our text into it. + + ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent); + + // All we have to do now is drop the Message_Block + // into the Stream. + + // It is always a good idea to duplicate() a Message_Block + // when you put it into any Message_Queue, as then + // you can always be allowed to release() your copy + // without worry. + + if (theStream.put(message->duplicate(), 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1); + } + + message->release(); + ++sent; + } + + // Now that we've sent our Message_Blocks, close down + // the Stream. + // + // The Stream will automagically delete the Modules and + // the contained Tasks. We don't have to do that. + // + // This call will block (due to the way we've written our + // Task class) until all Message_Blocks have cleared the + // entire Stream, and all associated threads have exited. + + theStream.close(); + + return 0; +} |