diff options
author | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-23 18:25:33 +0000 |
---|---|---|
committer | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-23 18:25:33 +0000 |
commit | cac79464628f0cf7bd59da38c66e7f5891486e9c (patch) | |
tree | 1eb57843170979a94dbc0a6d61cf1ad2b55da303 /docs | |
parent | eff4bc7b96c60ea4cf3fbebd914c53817d5be264 (diff) | |
download | ATCD-cac79464628f0cf7bd59da38c66e7f5891486e9c.tar.gz |
*** empty log message ***
Diffstat (limited to 'docs')
-rw-r--r-- | docs/tutorials/005/server.cpp | 8 | ||||
-rw-r--r-- | docs/tutorials/006/server.cpp | 8 | ||||
-rw-r--r-- | docs/tutorials/007/server.cpp | 12 | ||||
-rw-r--r-- | docs/tutorials/008/Makefile | 2 | ||||
-rw-r--r-- | docs/tutorials/009/Makefile | 2 | ||||
-rw-r--r-- | docs/tutorials/013/Makefile | 60 | ||||
-rw-r--r-- | docs/tutorials/013/block.cpp | 97 | ||||
-rw-r--r-- | docs/tutorials/013/block.h | 85 | ||||
-rw-r--r-- | docs/tutorials/013/data.h | 151 | ||||
-rw-r--r-- | docs/tutorials/013/message_queue.cpp | 77 | ||||
-rw-r--r-- | docs/tutorials/013/mld.cpp | 24 | ||||
-rw-r--r-- | docs/tutorials/013/mld.h | 44 | ||||
-rw-r--r-- | docs/tutorials/013/page01.html | 41 | ||||
-rw-r--r-- | docs/tutorials/013/page02.html | 112 | ||||
-rw-r--r-- | docs/tutorials/013/page03.html | 111 | ||||
-rw-r--r-- | docs/tutorials/013/page04.html | 38 | ||||
-rw-r--r-- | docs/tutorials/013/task.cpp | 190 | ||||
-rw-r--r-- | docs/tutorials/013/task.h | 48 | ||||
-rw-r--r-- | docs/tutorials/013/work.cpp | 125 | ||||
-rw-r--r-- | docs/tutorials/013/work.h | 67 |
20 files changed, 1298 insertions, 4 deletions
diff --git a/docs/tutorials/005/server.cpp b/docs/tutorials/005/server.cpp index 9f5803f8d46..ee46a9d2b57 100644 --- a/docs/tutorials/005/server.cpp +++ b/docs/tutorials/005/server.cpp @@ -111,3 +111,11 @@ int main (int argc, char *argv[]) return 0; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Acceptor <Client_Handler, ACE_SOCK_ACCEPTOR>; +template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Acceptor <Client_Handler, ACE_SOCK_ACCEPTOR> +#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/docs/tutorials/006/server.cpp b/docs/tutorials/006/server.cpp index d0a1e0f42b3..2547b065193 100644 --- a/docs/tutorials/006/server.cpp +++ b/docs/tutorials/006/server.cpp @@ -110,3 +110,11 @@ int main (int argc, char *argv[]) return 0; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Acceptor <Client_Handler, ACE_SOCK_ACCEPTOR>; +template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Acceptor <Client_Handler, ACE_SOCK_ACCEPTOR> +#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/docs/tutorials/007/server.cpp b/docs/tutorials/007/server.cpp index 14ef85c44d1..55fb69c58ef 100644 --- a/docs/tutorials/007/server.cpp +++ b/docs/tutorials/007/server.cpp @@ -105,3 +105,15 @@ int main (int argc, char *argv[]) return 0; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Acceptor <Client_Handler, ACE_SOCK_ACCEPTOR>; +template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>; +template class ACE_Guard<ACE_Mutex>; +template class ACE_Atomic_Op<ACE_Mutex, int>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Acceptor <Client_Handler, ACE_SOCK_ACCEPTOR> +#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +#pragma instantiate ACE_Guard<ACE_Mutex> +#pragma instantiate ACE_Atomic_Op<ACE_Mutex, int> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/docs/tutorials/008/Makefile b/docs/tutorials/008/Makefile index 59ac6bca63f..0b92cbfcc8c 100644 --- a/docs/tutorials/008/Makefile +++ b/docs/tutorials/008/Makefile @@ -12,8 +12,6 @@ FILES = BUILD = $(VBIN) -# SRC = $(addsuffix .cpp,$(BIN)) $(addsuffix .cpp,$(FILES)) - HDR = *.h #---------------------------------------------------------------------------- diff --git a/docs/tutorials/009/Makefile b/docs/tutorials/009/Makefile index 2bf85af82b7..553c77d6251 100644 --- a/docs/tutorials/009/Makefile +++ b/docs/tutorials/009/Makefile @@ -12,8 +12,6 @@ FILES = BUILD = $(VBIN) -SRC = $(addsuffix .cpp,$(BIN)) - HDR = *.h #---------------------------------------------------------------------------- diff --git a/docs/tutorials/013/Makefile b/docs/tutorials/013/Makefile new file mode 100644 index 00000000000..9b6fc0b3633 --- /dev/null +++ b/docs/tutorials/013/Makefile @@ -0,0 +1,60 @@ + +# $Id$ + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = message_queue + +FILES = task block work mld + +BUILD = $(VBIN) + +SRC = $(addsuffix .cpp,$(BIN)) +SRC += $(addsuffix .cpp,$(FILES)) + +HDR = *.h + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU +include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +Indent : # + for i in $(SRC) $(HDR) ; do \ + indent -npsl -l80 -fca -fc1 -cli0 -cdb -ts2 -bl -bli0 < $$i | \ + sed -e 's/: :/::/g' \ + -e 's/^.*\(public:\)/\1/' \ + -e 's/^.*\(protected:\)/\1/' \ + -e 's/^.*\(private:\)/\1/' \ + -e 's/:\(public\)/ : \1/' \ + -e 's/:\(protected\)/ : \1/' \ + -e 's/:\(private\)/ : \1/' \ + -e 's/ / /g' \ + > $$i~ ;\ + mv $$i~ $$i ;\ + done + +Depend : depend + perl ../007/fix.Makefile + +.depend : # + touch .depend + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +include .depend diff --git a/docs/tutorials/013/block.cpp b/docs/tutorials/013/block.cpp new file mode 100644 index 00000000000..5ca92e330f6 --- /dev/null +++ b/docs/tutorials/013/block.cpp @@ -0,0 +1,97 @@ + +// $Id$ + +#include "block.h" + +/* + Construct a Dat_Block to contain a unit of work. Note the careful + construction of the baseclass to set the block type and the locking strategy. + */ +Data_Block::Data_Block( Unit_Of_Work * _data ) + : inherited(0,ACE_Message_Block::MB_DATA,0,0,new Lock(),0,0) + ,data_(_data) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block ctor for 0x%x\n", (void *) this, (void*)data_)); +} + +/* + The Lock object created in the constructor is stored in the + baseclass and available through the locking_strategy() method. We + can cast it's value to our Lock object and invoke the destroy() to + indicate that we want it to go away when the lock is released. + */ +Data_Block::~Data_Block(void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block dtor for 0x%x\n", (void *) this, (void*)data_)); + ((Lock*)locking_strategy())->destroy(); + delete data_; +} + +/* + Return the data +*/ +Unit_Of_Work * Data_Block::data(void) +{ + return this->data_; +} + +Data_Block::Lock::Lock(void) + : destroy_(0) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Lock ctor\n", (void *) this )); +} + +Data_Block::Lock::~Lock(void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Lock dtor\n", (void *) this )); +} + +/* + Set our destroy_ flag so that the next lock release will cause us to + be deleted. +*/ +int Data_Block::Lock::destroy(void) +{ + ++destroy_; + return(0); +} + +/* + Mutexes have acquire() and release() methods. We've overridden the + latter so that when the object we're protecting goes away, we can + make ourselves go away after the lock is released. +*/ +int Data_Block::Lock::release(void) +{ + int rval = inherited::release(); + if( destroy_ ) + { + delete this; + } + return rval; +} + +/* + Create an baseclas unit of work when we instantiate a hangup message +*/ +Message_Block::Message_Block( void ) + : inherited( new Data_Block( new Unit_Of_Work() ) ) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for shutdown\n", (void *) this )); + this->msg_type( MB_HANGUP ); +} + +/* + Store the unit of work in a Data_Block and initialize the baseclass + with that data. +*/ +Message_Block::Message_Block( Unit_Of_Work * _data ) + : inherited( new Data_Block(_data) ) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for 0x%x\n", (void *) this, (void*)_data)); +} + +Message_Block::~Message_Block( void ) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block dtor\n", (void *) this )); +} diff --git a/docs/tutorials/013/block.h b/docs/tutorials/013/block.h new file mode 100644 index 00000000000..e6e05c5312c --- /dev/null +++ b/docs/tutorials/013/block.h @@ -0,0 +1,85 @@ + +// $Id$ + +#ifndef BLOCK_H +#define BLOCK_H + +#include "ace/Message_Block.h" +#include "ace/Synch.h" +#include "mld.h" +#include "work.h" + +/* + In this Tutorial, we derive from ACE_Data_Block for our special + data. With the possiblilty that our Task object may forward the + unit of work on to another thread pool, we have to make sure that + the data object doesn't go out of scope unexpectedly. An + ACE_Message_Block will be deleted as soon as it's release() method + is called but the ACE_Data_Blocks it uses are reference counted and + only delete when the last reference release()es the block. We use + that trait to simply our object memory management. + */ +class Data_Block : public ACE_Data_Block +{ +public: + typedef ACE_Data_Block inherited; + + // Create a data block with a unit of work to be done + Data_Block( Unit_Of_Work * _data ); + + ~Data_Block(void); + + // Returns the work pointer + Unit_Of_Work * data(void); + +protected: + Unit_Of_Work * data_; + MLD; // Our memory leak detector + + // The ACE_Data_Block allows us to choose a locking strategy + // for making the reference counting thread-safe. The + // ACE_Lock_Adaptor<> template adapts the interface of a + // number of lock objects so thatthe ACE_Message_Block will + // have an interface it can use. + class Lock : public ACE_Lock_Adapter<ACE_Mutex> + { + public: + typedef ACE_Lock_Adapter<ACE_Mutex> inherited; + + Lock(void); + ~Lock(void); + + // When the Data_Block is destroyed, the Message_Block is + // holding a lock with this object. If we were to destroy + // the Lock with the Data_Block, we would have a + // segfault. Instead, the Data_Block invokes destroy() to + // mark the object as un-needed so that when the + // Message_Block invokes release() to drop the lock, the + // Lock can delete itself. + int destroy(void); + int release(void); + protected: + int destroy_; + MLD; + }; +}; + +/* + This simple derivative of ACE_Message_Block will construct our + Data_Block object to contain a unit of work. + */ +class Message_Block : public ACE_Message_Block +{ +public: + typedef ACE_Message_Block inherited; + + Message_Block( void ); + Message_Block( Unit_Of_Work * _data ); + + ~Message_Block( void ); + +protected: + MLD; +}; + +#endif diff --git a/docs/tutorials/013/data.h b/docs/tutorials/013/data.h new file mode 100644 index 00000000000..1d15e6ad9e0 --- /dev/null +++ b/docs/tutorials/013/data.h @@ -0,0 +1,151 @@ + +// $Id$ + +#ifndef DATA_H +#define DATA_H + +#include "ace/Message_Block.h" +#include "ace/Synch.h" + +class Data_Base +{ +public: + Data_Base (void) + : state_(0) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Base ctor\n", (void *) this)); + ++self_; + ++instance_count_; + } + virtual ~ Data_Base (void) + { + if( --self_ < 0 ) + foo(); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Base dtor (%d) (%d)\n", (void *) this, self_.value(), --instance_count_)); + } + + void foo(void) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Base FOO\n", (void *) this )); + } + + void who_am_i (void) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Base instance\n", (void *) this)); + } + + virtual void what_am_i (void) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Data_Base object\n", (void*)this)); + } + + virtual int process(void) + { + switch( ++state_ ) + { + case 1: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage One\n",(void*)this)); + break; + case 2: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Two\n",(void*)this)); + break; + case 3: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Three\n",(void*)this)); + break; + } + return(0); + } + +protected: + ACE_Atomic_Op<ACE_Mutex,int> state_; + ACE_Atomic_Op<ACE_Mutex,int> self_; + static ACE_Atomic_Op<ACE_Mutex,int> instance_count_; +}; + +class Data : public Data_Base +{ +public: + Data (void) + : message_ (-1) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data ctor\n", (void *) this)); + } + + Data (int message) + : message_ (message) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data ctor for message %d\n", (void *) this, message_)); + } + virtual ~ Data (void) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data dtor\n", (void *) this)); + } + + void what_am_i (void) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Data object for message %d\n", (void*)this, message_)); + } + +protected: + int message_; + +}; + +class Data_Block : public ACE_Data_Block +{ +public: + typedef ACE_Data_Block inherited; + typedef ACE_Lock_Adapter<ACE_Mutex> lock_t; + + Data_Block( Data_Base * _data ) + : inherited(0,ACE_Message_Block::MB_DATA,0,0,new lock_t(),0,0) // 4.5.39 +// : inherited(0,ACE_Message_Block::MB_DATA,0,0,0,0) // 4.4.32 -- OK +// : inherited() // 4.4.32 -- OK , 4.5.39 -- SEGV + ,data_(_data) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block ctor for 0x%x\n", (void *) this, (void*)data_)); + } + ~Data_Block(void) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block dtor for 0x%x\n", (void *) this, (void*)data_)); + delete data_; + } + + Data_Base * data(void) + { + return this->data_; + } + +protected: + Data_Base * data_; +}; + +class Message_Block : public ACE_Message_Block +{ +public: + typedef ACE_Message_Block inherited; + + Message_Block( void ) + : inherited( new Data_Block( new Data_Base() ) ) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for shutdown\n", (void *) this )); + this->msg_type( MB_HANGUP ); + } + Message_Block( Data_Base * _data ) + : inherited( new Data_Block(_data) ) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for 0x%x\n", (void *) this, (void*)_data)); + } + + ~Message_Block( void ) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block dtor\n", (void *) this )); + } + + Data_Base * data(void) + { + return ((Data_Block*)this->data_block())->data(); + } +}; + +#endif diff --git a/docs/tutorials/013/message_queue.cpp b/docs/tutorials/013/message_queue.cpp new file mode 100644 index 00000000000..8d65c857c24 --- /dev/null +++ b/docs/tutorials/013/message_queue.cpp @@ -0,0 +1,77 @@ + +// $Id$ + +#include "mld.h" +#include "task.h" +#include "work.h" +#include "block.h" + +int run_test (int iterations, int threads, int subtasks) +{ + // Create a task with some subtasks. Each Task is a thread + // pool of 'threads' size. If a task has a subtask, it will + // forward the unit of work to the subtask when finished. See + // task.{h|cpp} for more details. + Task * task = new Task(subtasks); + + if (task->open (threads) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); + } + + // Give the threads a chance to get ready. + ACE_OS::sleep (ACE_Time_Value (1)); + + for (int i = 0; i < iterations; ++i) + { + // Create a custom message block that can contain our Work object + Message_Block *message = new Message_Block( new Work(i) ); + + // Put the "unit of work" into the message queue + if (task->putq (message) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + } + + // The default constructor of our custom message block will + // insert a message telling our task to shutdown. + Message_Block *message = new Message_Block( ); + + // Put the shutdown request into the thread pool + if (task->putq (message) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // Wait for the task to shut down. Any subtasks will also be + // waited for. + task->wait (); + + // Delete our Task to prevent a memory leak + delete task; + + // Ask our memory leak detector if things are OK + if( MLD_COUNTER->value() != 0 ) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Memory Leak!\n")); + } + + return (0); +} + +int main (int argc, char *argv[]) +{ + // Number of Work objects to put into the Task pool + int iterations = argc > 1 ? atoi (argv[1]) : 4; + // Number of threads for each Task + int threads = argc > 2 ? atoi (argv[2]) : 2; + // Number of tasks to chain after the primary task + int subtasks = argc > 3 ? atoi(argv[3]) : 1; + + (void) run_test (iterations, threads, subtasks); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Application exiting\n")); + + exit (0); +} diff --git a/docs/tutorials/013/mld.cpp b/docs/tutorials/013/mld.cpp new file mode 100644 index 00000000000..c9b660a057e --- /dev/null +++ b/docs/tutorials/013/mld.cpp @@ -0,0 +1,24 @@ + +// $Id$ + +#include "mld.h" + +ACE_Atomic_Op<ACE_Mutex,int> mld_counter::counter_; + +// Passthrough to the static atomic op +int mld_counter::value(void) +{ + return counter_.value(); +} + +// Increment the counter when a new mld is created... +mld::mld(void) +{ + ++MLD_COUNTER->counter_; +} + +// and decrement it when the object is destructed. +mld::~mld(void) +{ + --MLD_COUNTER->counter_; +} diff --git a/docs/tutorials/013/mld.h b/docs/tutorials/013/mld.h new file mode 100644 index 00000000000..2156484996a --- /dev/null +++ b/docs/tutorials/013/mld.h @@ -0,0 +1,44 @@ + +// $Id$ + +#ifndef MLD_H +#define MLD_H + +#include "ace/Synch.h" +#include "ace/Singleton.h" + +/* + This is a cheap memory leak detector. Each class I want to watch + over contains an mld object. The mld object's ctor increments a + global counter while the dtor decrements it. If the counter is + non-zero when the program is ready to exit then there may be a leak. +*/ + +class mld +{ +public: + mld(void); + ~mld(void); + + static int value(void); + +protected: + static ACE_Atomic_Op<ACE_Mutex,int> counter_; +}; + +//================================================ + +/* + Just drop 'MLD' anywhere in your class definition to get cheap + memory leak detection for your class. + + + Use 'MLD_COUNTER->value()' in main() to see if things are OK. We + don't really need a singleton since the counter itself is static and + thread safe but it makes the interface to mld simpler. + */ +#define MLD mld mld_ + +//================================================ + +#endif diff --git a/docs/tutorials/013/page01.html b/docs/tutorials/013/page01.html new file mode 100644 index 00000000000..217a2f9bf1d --- /dev/null +++ b/docs/tutorials/013/page01.html @@ -0,0 +1,41 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 013</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER> + + +<P> +<HR WIDTH="100%"> +<P> +My intent with this tutorial was to derive from ACE_Data_Block instead +of ACE_Message_Block so that we could leverage the reference counting +nature of that object. +<P> +Along the way, I sort of got distracted... What I ended up with is a +poor excuse for ACE_Stream that implements a simple state machine. +<P> +The application is built around a thread pool where the pool's svc() +method takes work units from the message queue for processing. As +each unit is taken from the queue, the process() method is invoked to +do some work. The twist is that after processing the message, we +enqueue it into another thread pool to do more work. This continues +through a chain of thread pools until the last where the unit's fini() +method is called for finishing up any outstanding work. +<P> +The chain of thread pools is uni-directional using a singly-linked +list of Task derivatives. Each pool has the same number of tasks in +order to keep things simple. +<P> +<HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page02.html">Continue +This Tutorial</A>]</CENTER> + +</BODY> +</HTML> diff --git a/docs/tutorials/013/page02.html b/docs/tutorials/013/page02.html new file mode 100644 index 00000000000..785b0c86b75 --- /dev/null +++ b/docs/tutorials/013/page02.html @@ -0,0 +1,112 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 013</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER> + + +<P> +<HR WIDTH="100%"> +<P> +We'll go back to our tradition of looking at main() first. The only +change here from our "normal" thread pool is the ability to specify +the number of subtasks for the pool. (Each subtask is another thread +pool in the chain. I suppose I should have named that better...) +I've still got the custom Message_Block so that, at this level, we +don't even know about custom Data_Blocks. +<P> +<HR WIDTH="100%"> +<PRE> +#include "mld.h" +#include "task.h" +#include "work.h" +#include "block.h" + +int run_test (int iterations, int threads, int subtasks) +{ + // Create a task with some subtasks. Each Task is a thread + // pool of 'threads' size. If a task has a subtask, it will + // forward the unit of work to the subtask when finished. See + // task.{h|cpp} for more details. + Task * task = new Task(subtasks); + + if (task->open (threads) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); + } + + // Give the threads a chance to get ready. + ACE_OS::sleep (ACE_Time_Value (1)); + + for (int i = 0; i < iterations; ++i) + { + // Create a custom message block that can contain our Work object + Message_Block *message = new Message_Block( new Work(i) ); + + // Put the "unit of work" into the message queue + if (task->putq (message) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + } + + // The default constructor of our custom message block will + // insert a message telling our task to shutdown. + Message_Block *message = new Message_Block( ); + + // Put the shutdown request into the thread pool + if (task->putq (message) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // Wait for the task to shut down. Any subtasks will also be + // waited for. + task->wait (); + + // Delete our Task to prevent a memory leak + delete task; + + // Ask our memory leak detector if things are OK + if( MLD_COUNTER->value() != 0 ) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Memory Leak!\n")); + } + + return (0); +} + +int main (int argc, char *argv[]) +{ + // Number of Work objects to put into the Task pool + int iterations = argc > 1 ? atoi (argv[1]) : 4; + // Number of threads for each Task + int threads = argc > 2 ? atoi (argv[2]) : 2; + // Number of tasks to chain after the primary task + int subtasks = argc > 3 ? atoi(argv[3]) : 1; + + (void) run_test (iterations, threads, subtasks); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Application exiting\n")); + + exit (0); +} +</PRE> +<HR WIDTH="100%"> +<P> +Nothing really surprising here... Just remember that your total +number of threads is ( ( 1 + subtasks ) * threads ). You probably +don't want to get too carried away with that! +<P> +<HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page03.html">Continue +This Tutorial</A>]</CENTER> + +</BODY> +</HTML> diff --git a/docs/tutorials/013/page03.html b/docs/tutorials/013/page03.html new file mode 100644 index 00000000000..37b8cb2d027 --- /dev/null +++ b/docs/tutorials/013/page03.html @@ -0,0 +1,111 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 013</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER> + + +<P> +<HR WIDTH="100%"> +<P> +I did eventually create that ACE_Data_Block derivative that I wanted. +My purpose in doing so was to use the reference-counting +that is provided by ACE_Data_Block and ACE_Message_Block interactions. + When you're working with an object in a single +thread, it's generally not so difficult to manage it's lifetime. +That is, it doesn't tend to go out of scope or get destroyed unless +you do it on purpose. +<P> +On the other hand, if you're passing data between several threads, it +is easy to loose track of who "owns" the data at any one time. All +too frequently, data will be deleted by one thread while another is +still using it. Reference counting can prevent that. The rule of +thumb is that you increment the reference count of the object when you +hand it off to a new thread. You then decrement the count when you're +done with the object and let the object delete itself when there are +no more references. +<P> +To prove that all of that works correctly in the tutorial, I've +created a cheap Memory Leak Detector object. All mld instances +reference a thread-safe counter that is incremented when the mld is +constructed and decremented when destructed. I then insert an mld +into each of my dynamically created objects. If I get to the end of +main() and the counter isn't zero then I either didn't delete enough +or I deleted too many times. +<P> +Simple, cheap, effective. +<P> +<HR WIDTH="100%"> +<PRE> +#ifndef MLD_H +#define MLD_H + +#include "ace/Synch.h" +#include "ace/Singleton.h" + +/* + This is a cheap memory leak detector. Each class I want to watch + over contains an mld object. The mld object's ctor increments a + global counter while the dtor decrements it. If the counter is + non-zero when the program is ready to exit then there may be a leak. +*/ + +class mld +{ +public: + mld(void); + ~mld(void); +}; + +/* + The mld_counter will be made into a singleton so that all of the mld + objects can access it easily. +*/ + +class mld_counter +{ +public: + // Return the value of the counter + static int value(void); + +protected: + friend class mld; + + // Use ACE_Atomic_Op to make the counter thread-safe + static ACE_Atomic_Op<ACE_Mutex,int> counter_; +}; + +typedef ACE_Singleton<mld_counter,ACE_Mutex> MLD_Counter; + +//================================================ + +/* + A couple of handy macros to make things easier to remember. Just + drop 'MLD' anywhere in your class definition. + + Use 'MLD_COUNTER->value()' in main() to see if things are OK. We + don't really need a singleton since the counter itself is static and + thread safe but it makes the interface to mld simpler. + */ +#define MLD mld mld_ +#define MLD_COUNTER MLD_Counter::instance() + +//================================================ + +#endif +</PRE> +<HR WIDTH="100%"> +<P> +<P> +<HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page04.html">Continue +This Tutorial</A>]</CENTER> + +</BODY> +</HTML> diff --git a/docs/tutorials/013/page04.html b/docs/tutorials/013/page04.html new file mode 100644 index 00000000000..d10894eccb2 --- /dev/null +++ b/docs/tutorials/013/page04.html @@ -0,0 +1,38 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 013</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER> + + +<P> +<HR WIDTH="100%"> +<P> +We'll go back to our tradition of looking at main() first. The only +change here from our "normal" thread pool is the ability to specify +the number of subtasks for the pool. (Each subtask is another thread +pool in the chain. I suppose I should have named that better...) +I've still got the custom Message_Block so that, at this level, we +don't even know about custom Data_Blocks. +<P> +<HR WIDTH="100%"> +<PRE> +</PRE> +<HR WIDTH="100%"> +<P> +Nothing really surprising here... Just remember that your total +number of threads is ( ( 1 + subtasks ) * threads ). You probably +don't want to get too carried away with that! +<P> +<HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page04.html">Continue +This Tutorial</A>]</CENTER> + +</BODY> +</HTML> diff --git a/docs/tutorials/013/task.cpp b/docs/tutorials/013/task.cpp new file mode 100644 index 00000000000..399d0ff5029 --- /dev/null +++ b/docs/tutorials/013/task.cpp @@ -0,0 +1,190 @@ + +// $Id$ + +#include "task.h" +#include "block.h" +#include "work.h" + +/* + Construct the Task with zero or more subtasks. If subtasks are + requested, we assign our next_ pointer to the first of those and let + it worry about any remaining subtasks. + */ +Task::Task ( int sub_tasks ) +: barrier_ (0) + ,next_(0) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task ctor 0x%x\n", (void *) this)); + if( sub_tasks ) + { + next_ = new Task( --sub_tasks ); + } +} + +/* + Delete our barrier object and any subtasks we may have. +*/ +Task::~Task (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task dtor 0x%x\n", (void *) this)); + + delete barrier_; + delete next_; +} + +/* + Open our thread pool with the requested number of threads. If + subtasks are enabled, they inherit the thread-pool size. Make sure + that the subtasks can be opened before we open our own threadpool. +*/ +int Task::open (int threads ) +{ + if( next_ ) + { + if( next_->open(threads) == -1 ) + { + return -1; + } + } + + barrier_ = new ACE_Barrier (threads); + return this->activate (THR_NEW_LWP, threads); +} + +/* + Close ourselves and any subtasks. This just prints a message so + that we can assure ourselves things are cleaned up correctly. +*/ +int Task::close (u_long flags) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task close 0x%x\n", (void *) this)); + if( next_ ) + { + next_->close(flags); + } + + return(0); +} + +/* + Wait for all of the threads in our pool to exit and then wait for + any subtasks. When called from the front of the task chain, this + won't return until all thread pools in the chain have exited. +*/ +int Task::wait(void) +{ + inherited::wait(); + if( next_ ) + { + next_->wait(); + } + return(0); +} + +/* + Like the thread-pools before, this is where all of the work is done. +*/ +int Task::svc (void) +{ + // Wait for all threads to get this far before continuing. + this->barrier_->wait (); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task 0x%x starts in thread %u\n", (void *) this, ACE_Thread::self ())); + + // getq() wants an ACE_Message_Block so we'll start out with one + // of those. We could do some casting (or even auto-casting) to + // avoid the extra variable but I prefer to be clear about our actions. + ACE_Message_Block * message; + // What we really put into the queue was our Message_Block. + // After we get the message from the queue, we'll cast it to this + // so that we know how to work on it. + Message_Block * message_block; + // And, of course, our Message_Block contains our Data_Block + // instead of the typical ACE_Data_Block + Data_Block * data_block; + // Even though we put Work objects into the queue, we take them + // out using the baseclass pointer. This allows us to create new + // derivatives without having to change this svc() method. + Unit_Of_Work * work; + + while (1) + { + // Get the ACE_Message_Block + if (this->getq (message) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "getq"), -1); + } + + // "Convert" it to our Message_Block + message_block = (Message_Block*)message; + + // Get the ACE_Data_Block and "convert" to Data_Block in one step. + data_block = (Data_Block*)(message_block->data_block()); + + // Get the unit of work from the data block + work = data_block->data(); + + // Show the object's instance value and "type name" + work->who_am_i(); + work->what_am_i(); + + // If there is a hangup we need to tell our pool-peers as + // well as any subtasks. + if (message_block->msg_type () == ACE_Message_Block::MB_HANGUP) + { + // duplicate()ing the message block will increment the + // reference counts on the data blocks. This allows us + // to safely release() the message block. The rule of + // thumb is that if you pass a message block to a new + // owner, duplicate() it. Then you can release() when + // you're done and not worry about memory leaks. + if( this->putq (message_block->duplicate()) == -1 ) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // If we have a subtask, duplicate() the message block + // again and pass it to that task's queue + if( next_ && next_->putq(message_block->duplicate()) == -1 ) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // We're now done with our copy of the block, so we can + // release it. Our peers/subtasks have their own message + // block to access the shared data blocks. + message_block->release(); + + break; + } + + // If this isn't a hangup/shutdown message then we tell the + // unit of work to process() for a while. + work->process(); + + if( next_ ) + { + // If we have subtasks, we pass the block on to them. Notice + // that I don't bother to duplicate() the block since I won't + // release it in this case. I could have invoked + // duplicate() in the puq() and then release() + // afterwards. Either is acceptable. + if( next_->putq(message_block) == -1 ) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + else + { + // If we don't have subtasks then invoke fini() to tell + // the unit of work that we won't be invoking process() + // any more. Then release() the block. This release() + // would not change if we duplicate()ed in the above conditional + work->fini(); + message_block->release(); + } + + // Pretend that the work takes some time... + ACE_OS::sleep (ACE_Time_Value (0, 250)); + } + + return (0); +} diff --git a/docs/tutorials/013/task.h b/docs/tutorials/013/task.h new file mode 100644 index 00000000000..688b739a765 --- /dev/null +++ b/docs/tutorials/013/task.h @@ -0,0 +1,48 @@ + +// $Id$ + +#ifndef TASK_H +#define TASK_H + +#include "ace/Task.h" +#include "mld.h" + +/* + This is much like the Task we've used in the past for implementing a + thread pool. This time, however, I've made the Task an element in a + singly-linked list. As the svc() method finishes the process() on a + unit of work, it will enqueue that unit of work to the next_ Task if + there is one. If the Task does not have a next_ Task, it will + invoke the unit of work object's fini() method after invoking process(). + */ +class Task : public ACE_Task < ACE_MT_SYNCH > +{ +public: + + typedef ACE_Task < ACE_MT_SYNCH > inherited; + + // Construct ourselves and an optional number of subtasks + // chained beyond us. + Task ( int sub_tasks = 0 ); + ~Task (void); + + // Open the Task with the proper thread-pool size + int open (int threads = 1 ); + + // Take Unit_Of_Work objects from the thread pool and invoke + // their process() and/or fini() as appropriate. + int svc (void); + + // Shut down the thread pool and it's associated subtasks + int close (u_long flags = 0); + + // Wait for the pool and subtasks to close + int wait(void); + +protected: + ACE_Barrier * barrier_; + Task * next_; + MLD; +}; + +#endif diff --git a/docs/tutorials/013/work.cpp b/docs/tutorials/013/work.cpp new file mode 100644 index 00000000000..f4f299c59a3 --- /dev/null +++ b/docs/tutorials/013/work.cpp @@ -0,0 +1,125 @@ + +// $Id$ + +#include "work.h" + +/* + Initialize the state to zero +*/ +Unit_Of_Work::Unit_Of_Work (void) + : state_(0) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work ctor\n", (void *) this)); +} + +Unit_Of_Work::~Unit_Of_Work (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work dtor\n", (void *) this )); +} + +/* + Display our instance value +*/ +void Unit_Of_Work::who_am_i (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work instance\n", (void *) this)); +} + +/* + Dispay our type name +*/ +void Unit_Of_Work::what_am_i (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Unit_Of_Work object\n", (void*)this)); +} + +/* + Return failure. You should always derive from Unit_Of_Work... +*/ +int Unit_Of_Work::process(void) +{ + return -1; +} + +/* + ditto +*/ +int Unit_Of_Work::fini(void) +{ + return -1; +} + +/* + Default constructor has no "message number" +*/ +Work::Work (void) + : message_ (-1) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor\n", (void *) this)); +} + +/* + The useful constructor remembers which message it is and will tell + you if you ask. +*/ +Work::Work (int message) + : message_ (message) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor for message %d\n", (void *) this, message_)); +} + +Work::~Work (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work dtor\n", (void *) this)); +} + +/* + This objects type name is different from the baseclass +*/ +void Work::what_am_i (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Work object for message %d\n", (void*)this, message_)); +} + +/* + A very simple state machine that just walks through three stages. + If it is called more than that, it will tell you not to bother. +*/ +int Work::process(void) +{ + switch( ++state_ ) + { + case 1: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage One\n",(void*)this)); + break; + case 2: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Two\n",(void*)this)); + break; + case 3: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Three\n",(void*)this)); + break; + default: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x No work to do in state %d\n", + (void*)this, state_.value())); + break; + } + return(0); +} + +/* + If you don't have enough subtasks in the chain then the state + machine won't progress to the end. The fini() hook will allow us to + recover from that by executing the remaining states in the final + task of the chain. +*/ +int Work::fini(void) +{ + while( state_.value() < 3 ) + { + if( this->process() == -1 ) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "process"), -1); + } + } + return(0); +} diff --git a/docs/tutorials/013/work.h b/docs/tutorials/013/work.h new file mode 100644 index 00000000000..6cdaa42d104 --- /dev/null +++ b/docs/tutorials/013/work.h @@ -0,0 +1,67 @@ + +// $Id$ + +#ifndef WORK_H +#define WORK_H + +#include "ace/Log_Msg.h" +#include "ace/Synch.h" +#include "mld.h" + +/* + Our specilized message queue and thread pool will know how to do + "work" on our Unit_Of_Work baseclass. +*/ +class Unit_Of_Work +{ +public: + Unit_Of_Work (void); + + virtual ~ Unit_Of_Work (void); + + // Display the object instance value + void who_am_i (void); + + // The baseclass can override this to show it's "type name" + virtual void what_am_i (void); + + // This is where you do application level logic. It will be + // called once for each thread pool it passes through. It + // would typically implement a state machine and execute a + // different state on each call. + virtual int process(void); + + // This is called by the last Task in the series (see task.h) + // in case our process() didn't get through all of it's states. + virtual int fini(void); + +protected: + ACE_Atomic_Op<ACE_Mutex,int> state_; + MLD; +}; + +/* + A fairly trivial work derivative that implements an equally trivial + state machine in process() +*/ +class Work : public Unit_Of_Work +{ +public: + Work (void); + + Work (int message); + + virtual ~ Work (void); + + void what_am_i (void); + + int process(void); + + int fini(void); + +protected: + int message_; + MLD; +}; + +#endif |