diff options
author | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-27 15:07:51 +0000 |
---|---|---|
committer | jcej <jcej@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-27 15:07:51 +0000 |
commit | c03ae3c67575f06c9aa66499db801d42dc8f6500 (patch) | |
tree | eb4b5084f3cac71e457dcc374f0d2785115bcbca /docs | |
parent | 6973456686b766f3c9a61a8e1bf73deea5dd250d (diff) | |
download | ATCD-c03ae3c67575f06c9aa66499db801d42dc8f6500.tar.gz |
*** empty log message ***
Diffstat (limited to 'docs')
-rw-r--r-- | docs/tutorials/013/block.cpp | 109 | ||||
-rw-r--r-- | docs/tutorials/013/block.h | 105 | ||||
-rw-r--r-- | docs/tutorials/013/message_queue.cpp | 114 | ||||
-rw-r--r-- | docs/tutorials/013/mld.cpp | 14 | ||||
-rw-r--r-- | docs/tutorials/013/mld.h | 32 | ||||
-rw-r--r-- | docs/tutorials/013/page06.html | 286 | ||||
-rw-r--r-- | docs/tutorials/013/page07.html | 244 | ||||
-rw-r--r-- | docs/tutorials/013/page08.html | 45 | ||||
-rw-r--r-- | docs/tutorials/013/task.cpp | 285 | ||||
-rw-r--r-- | docs/tutorials/013/task.h | 44 | ||||
-rw-r--r-- | docs/tutorials/013/work.cpp | 129 | ||||
-rw-r--r-- | docs/tutorials/013/work.h | 64 |
12 files changed, 1025 insertions, 446 deletions
diff --git a/docs/tutorials/013/block.cpp b/docs/tutorials/013/block.cpp index b23841af6ef..5d997510244 100644 --- a/docs/tutorials/013/block.cpp +++ b/docs/tutorials/013/block.cpp @@ -4,94 +4,95 @@ #include "block.h" /* - Construct a Dat_Block to contain a unit of work. Note the careful - construction of the baseclass to set the block type and the locking strategy. + Construct a Dat_Block to contain a unit of work. Note the careful + construction of the baseclass to set the block type and the locking + strategy. */ -Data_Block::Data_Block( Unit_Of_Work * _data ) - : inherited(0,ACE_Message_Block::MB_DATA,0,0,new Lock(),0,0) - ,data_(_data) +Data_Block::Data_Block (Unit_Of_Work * _data) +: inherited (0, ACE_Message_Block::MB_DATA, 0, 0, new Lock (), 0, 0) +,data_ (_data) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block ctor for 0x%x\n", (void *) this, (void*)data_)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block ctor for 0x%x\n", (void *) this, (void *) data_)); } /* - The Lock object created in the constructor is stored in the - baseclass and available through the locking_strategy() method. We - can cast it's value to our Lock object and invoke the destroy() to - indicate that we want it to go away when the lock is released. + The Lock object created in the constructor is stored in the baseclass and + available through the locking_strategy() method. We can cast it's value to + our Lock object and invoke the destroy() to indicate that we want it to go + away when the lock is released. */ -Data_Block::~Data_Block(void) +Data_Block::~Data_Block (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block dtor for 0x%x\n", (void *) this, (void*)data_)); - ((Lock*)locking_strategy())->destroy(); - delete data_; + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Data_Block dtor for 0x%x\n", (void *) this, (void *) data_)); + ((Lock *) locking_strategy ())->destroy (); + delete data_; } /* - Return the data -*/ -Unit_Of_Work * Data_Block::data(void) + Return the data + */ +Unit_Of_Work *Data_Block::data (void) { - return this->data_; + return this->data_; } -Data_Block::Lock::Lock(void) - : destroy_(0) +Data_Block:: Lock::Lock (void) +:destroy_ (0) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Lock ctor\n", (void *) this )); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Lock ctor\n", (void *) this)); } -Data_Block::Lock::~Lock(void) +Data_Block:: Lock::~Lock (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Lock dtor\n", (void *) this )); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Lock dtor\n", (void *) this)); } /* - Set our destroy_ flag so that the next lock release will cause us to - be deleted. -*/ -int Data_Block::Lock::destroy(void) + Set our destroy_ flag so that the next lock release will cause us to be + deleted. + */ +int Data_Block::Lock::destroy (void) { - ++destroy_; - return(0); + ++destroy_; + return (0); } /* - Mutexes have acquire() and release() methods. We've overridden the - latter so that when the object we're protecting goes away, we can - make ourselves go away after the lock is released. -*/ -int Data_Block::Lock::release(void) + Mutexes have acquire() and release() methods. We've overridden the latter + so that when the object we're protecting goes away, we can make ourselves go + away after the lock is released. + */ +int Data_Block::Lock::release (void) { - int rval = inherited::release(); - if( destroy_ ) - { - delete this; - } - return rval; + int rval = inherited::release (); + if (destroy_) + { + delete this; + } + return rval; } /* - Create an baseclass unit of work when we instantiate a hangup message. -*/ -Message_Block::Message_Block( void ) - : inherited( new Data_Block( new Unit_Of_Work() ) ) + Create an baseclass unit of work when we instantiate a hangup message. + */ +Message_Block::Message_Block (void) +:inherited (new Data_Block (new Unit_Of_Work ())) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for shutdown\n", (void *) this )); - this->msg_type( MB_HANGUP ); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for shutdown\n", (void *) this)); + this->msg_type (MB_HANGUP); } /* - Store the unit of work in a Data_Block and initialize the baseclass - with that data. -*/ -Message_Block::Message_Block( Unit_Of_Work * _data ) - : inherited( new Data_Block(_data) ) + Store the unit of work in a Data_Block and initialize the baseclass with + that data. + */ +Message_Block::Message_Block (Unit_Of_Work * _data) +:inherited (new Data_Block (_data)) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for 0x%x\n", (void *) this, (void*)_data)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block ctor for 0x%x\n", (void *) this, (void *) _data)); } -Message_Block::~Message_Block( void ) +Message_Block::~Message_Block (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block dtor\n", (void *) this )); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Message_Block dtor\n", (void *) this)); } diff --git a/docs/tutorials/013/block.h b/docs/tutorials/013/block.h index e6e05c5312c..40a31bd1b07 100644 --- a/docs/tutorials/013/block.h +++ b/docs/tutorials/013/block.h @@ -10,76 +10,75 @@ #include "work.h" /* - In this Tutorial, we derive from ACE_Data_Block for our special - data. With the possiblilty that our Task object may forward the - unit of work on to another thread pool, we have to make sure that - the data object doesn't go out of scope unexpectedly. An - ACE_Message_Block will be deleted as soon as it's release() method - is called but the ACE_Data_Blocks it uses are reference counted and - only delete when the last reference release()es the block. We use - that trait to simply our object memory management. + In this Tutorial, we derive from ACE_Data_Block for our special data. With + the possiblilty that our Task object may forward the unit of work on to + another thread pool, we have to make sure that the data object doesn't go + out of scope unexpectedly. An ACE_Message_Block will be deleted as soon as + it's release() method is called but the ACE_Data_Blocks it uses are + reference counted and only delete when the last reference release()es the + block. We use that trait to simply our object memory management. */ class Data_Block : public ACE_Data_Block { public: - typedef ACE_Data_Block inherited; + typedef ACE_Data_Block inherited; - // Create a data block with a unit of work to be done - Data_Block( Unit_Of_Work * _data ); - - ~Data_Block(void); + // Create a data block with a unit of work to be done + Data_Block (Unit_Of_Work * _data); + + ~Data_Block (void); + + // Returns the work pointer + Unit_Of_Work *data (void); - // Returns the work pointer - Unit_Of_Work * data(void); - protected: Unit_Of_Work * data_; - MLD; // Our memory leak detector - - // The ACE_Data_Block allows us to choose a locking strategy - // for making the reference counting thread-safe. The - // ACE_Lock_Adaptor<> template adapts the interface of a - // number of lock objects so thatthe ACE_Message_Block will - // have an interface it can use. - class Lock : public ACE_Lock_Adapter<ACE_Mutex> - { - public: - typedef ACE_Lock_Adapter<ACE_Mutex> inherited; - - Lock(void); - ~Lock(void); - - // When the Data_Block is destroyed, the Message_Block is - // holding a lock with this object. If we were to destroy - // the Lock with the Data_Block, we would have a - // segfault. Instead, the Data_Block invokes destroy() to - // mark the object as un-needed so that when the - // Message_Block invokes release() to drop the lock, the - // Lock can delete itself. - int destroy(void); - int release(void); - protected: - int destroy_; - MLD; - }; + MLD; // Our memory leak detector + + // The ACE_Data_Block allows us to choose a locking strategy + // for making the reference counting thread-safe. The + // ACE_Lock_Adaptor<> template adapts the interface of a + // number of lock objects so thatthe ACE_Message_Block will + // have an interface it can use. + class Lock : public ACE_Lock_Adapter < ACE_Mutex > + { +public: + typedef ACE_Lock_Adapter < ACE_Mutex > inherited; + + Lock (void); + ~Lock (void); + + // When the Data_Block is destroyed, the Message_Block is + // holding a lock with this object. If we were to destroy + // the Lock with the Data_Block, we would have a + // segfault. Instead, the Data_Block invokes destroy() to + // mark the object as un-needed so that when the + // Message_Block invokes release() to drop the lock, the + // Lock can delete itself. + int destroy (void); + int release (void); +protected: + int destroy_; + MLD; + }; }; /* - This simple derivative of ACE_Message_Block will construct our - Data_Block object to contain a unit of work. + This simple derivative of ACE_Message_Block will construct our Data_Block + object to contain a unit of work. */ class Message_Block : public ACE_Message_Block { public: - typedef ACE_Message_Block inherited; - - Message_Block( void ); - Message_Block( Unit_Of_Work * _data ); - - ~Message_Block( void ); + typedef ACE_Message_Block inherited; + + Message_Block (void); + Message_Block (Unit_Of_Work * _data); + + ~Message_Block (void); protected: MLD; }; - + #endif diff --git a/docs/tutorials/013/message_queue.cpp b/docs/tutorials/013/message_queue.cpp index cabfc5d4655..5128cbbfac4 100644 --- a/docs/tutorials/013/message_queue.cpp +++ b/docs/tutorials/013/message_queue.cpp @@ -8,79 +8,81 @@ int run_test (int iterations, int threads, int subtasks) { - // Create a task with some subtasks. Each Task is a thread - // pool of 'threads' size. If a task has a subtask, it will - // forward the unit of work to the subtask when finished. See - // task.{h|cpp} for more details. - Task * task = new Task(subtasks); + // Create a task with some subtasks. Each Task is a thread + // pool of 'threads' size. If a task has a subtask, it will + // forward the unit of work to the subtask when finished. See + // task.{h|cpp} for more details. + Task *task = new Task (subtasks); - if (task->open (threads) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); - } + if (task->open (threads) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); + } + + // Give the threads a chance to get ready. +ACE_OS::sleep (ACE_Time_Value (1)); - // Give the threads a chance to get ready. - ACE_OS::sleep (ACE_Time_Value (1)); + for (int i = 0; i < iterations; ++i) + { + // Create a custom message block that can contain our Work object + Message_Block *message = new Message_Block (new Work (i)); - for (int i = 0; i < iterations; ++i) + // Put the "unit of work" into the message queue + if (task->putq (message) == -1) { - // Create a custom message block that can contain our Work object - Message_Block *message = new Message_Block( new Work(i) ); - - // Put the "unit of work" into the message queue - if (task->putq (message) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); - } + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); } + } - // The default constructor of our custom message block will - // insert a message telling our task to shutdown. - Message_Block *message = new Message_Block( ); + // The default constructor of our custom message block will + // insert a message telling our task to shutdown. + Message_Block *message = new Message_Block (); - // Put the shutdown request into the thread pool - if (task->putq (message) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); - } + // Put the shutdown request into the thread pool + if (task->putq (message) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } - // Wait for the task to shut down. Any subtasks will also be - // waited for. - task->wait (); + // Wait for the task to shut down. Any subtasks will also be + // waited for. + task->wait (); - // Delete our Task to prevent a memory leak - delete task; + // Delete our Task to prevent a memory leak + delete task; - // Ask our memory leak detector if things are OK - if( MLD_COUNTER != 0 ) - { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) Memory Leak!\n")); - } - - return (0); + // Ask our memory leak detector if things are OK + if (MLD_COUNTER != 0) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Memory Leak!\n")); + } + + return (0); } int main (int argc, char *argv[]) { - // Number of Work objects to put into the Task pool - int iterations = argc > 1 ? atoi (argv[1]) : 4; - // Number of threads for each Task - int threads = argc > 2 ? atoi (argv[2]) : 2; - // Number of tasks to chain after the primary task - int subtasks = argc > 3 ? atoi(argv[3]) : 1; - - (void) run_test (iterations, threads, subtasks); - - ACE_DEBUG ((LM_DEBUG, "(%P|%t) Application exiting\n")); - - exit (0); + // Number of Work objects to put into the Task pool + int iterations = argc > 1 ? atoi (argv[1]) : 4; + // Number of threads for each Task + int threads = argc > 2 ? atoi (argv[2]) : 2; + // Number of tasks to chain after the primary task + int subtasks = argc > 3 ? atoi (argv[3]) : 1; + + (void) run_test (iterations, threads, subtasks); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Application exiting\n")); + + exit (0); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Guard<ACE_Mutex>; -template class ACE_Lock_Adapter<ACE_Mutex>; -template class ACE_Atomic_Op<ACE_Mutex, int>; +template class ACE_Guard < ACE_Mutex >; +template class ACE_Lock_Adapter < ACE_Mutex >; +template class ACE_Atomic_Op < ACE_Mutex, int >; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Guard<ACE_Mutex>; #pragma instantiate ACE_Lock_Adapter<ACE_Mutex>; #pragma instantiate ACE_Atomic_Op<ACE_Mutex, int>; -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ +#endif /* + ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION + */ diff --git a/docs/tutorials/013/mld.cpp b/docs/tutorials/013/mld.cpp index 08570cfb2ce..64ffdf1b144 100644 --- a/docs/tutorials/013/mld.cpp +++ b/docs/tutorials/013/mld.cpp @@ -3,21 +3,21 @@ #include "mld.h" -ACE_Atomic_Op<ACE_Mutex,int> mld::counter_(0); +ACE_Atomic_Op < ACE_Mutex, int >mld::counter_ (0); // Increment the counter when a new mld is created... -mld::mld(void) +mld::mld (void) { - ++counter_; + ++counter_; } // and decrement it when the object is destructed. -mld::~mld(void) +mld::~mld (void) { - --counter_; + --counter_; } -int mld::value(void) +int mld::value (void) { - return counter_.value(); + return counter_.value (); } diff --git a/docs/tutorials/013/mld.h b/docs/tutorials/013/mld.h index 82a83515121..a9ee1de02b4 100644 --- a/docs/tutorials/013/mld.h +++ b/docs/tutorials/013/mld.h @@ -8,37 +8,37 @@ #include "ace/Singleton.h" /* - This is a cheap memory leak detector. Each class I want to watch - over contains an mld object. The mld object's ctor increments a - global counter while the dtor decrements it. If the counter is - non-zero when the program is ready to exit then there may be a leak. -*/ + This is a cheap memory leak detector. Each class I want to watch over + contains an mld object. The mld object's ctor increments a global counter + while the dtor decrements it. If the counter is non-zero when the program + is ready to exit then there may be a leak. + */ class mld { public: - mld(void); - ~mld(void); + mld (void); + ~mld (void); + + static int value (void); - static int value(void); - protected: - static ACE_Atomic_Op<ACE_Mutex,int> counter_; + static ACE_Atomic_Op < ACE_Mutex, int >counter_; }; -//================================================ +// ================================================ /* - Just drop 'MLD' anywhere in your class definition to get cheap - memory leak detection for your class. + Just drop 'MLD' anywhere in your class definition to get cheap memory leak + detection for your class. */ #define MLD mld mld_ /* - Use 'MLD_COUNTER' in main() to see if things are OK. -*/ + Use 'MLD_COUNTER' in main() to see if things are OK. + */ #define MLD_COUNTER mld::value() -//================================================ +// ================================================ #endif diff --git a/docs/tutorials/013/page06.html b/docs/tutorials/013/page06.html new file mode 100644 index 00000000000..972d5755e19 --- /dev/null +++ b/docs/tutorials/013/page06.html @@ -0,0 +1,286 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 013</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER> + + +<P> +<HR WIDTH="100%"> +<P> +Let's take a look now at the new Task object. This will obviously be +different from the Tasks we've created before but I think you'll be +surprised at how relatively simple it actually is. +<P> +Remember that the goal of this tutorial was to use the reference +counting abilities of the ACE_Data_Block. The only way to show that +effectively is to have a data block passed between different threads. +A thread pool isn't really going to do that so, instead, our new Task +can be part of a chain of tasks. In that way, each Task can pass the +data on to another and satisfy our need for moving the ACE_Data_Block +around. +If we've done the reference counting correctly then none of our tasks +will be trying to work with deleted data and we won't have any memory +leaks at the end. +<P> +There's not much to the header, so I've included it and the cpp file +on this one page. +<P> +<HR WIDTH="100%"> +<PRE> + +#include "ace/Task.h" +#include "mld.h" + +/* + This is much like the Task we've used in the past for implementing a + thread pool. This time, however, I've made the Task an element in a + singly-linked list. As the svc() method finishes the process() on a + unit of work, it will enqueue that unit of work to the next_ Task if + there is one. If the Task does not have a next_ Task, it will + invoke the unit of work object's fini() method after invoking process(). + */ +class Task : public ACE_Task < ACE_MT_SYNCH > +{ +public: + + typedef ACE_Task < ACE_MT_SYNCH > inherited; + + // Construct ourselves and an optional number of subtasks + // chained beyond us. + Task ( int sub_tasks = 0 ); + ~Task (void); + + // Open the Task with the proper thread-pool size + int open (int threads = 1 ); + + // Take Unit_Of_Work objects from the thread pool and invoke + // their process() and/or fini() as appropriate. + int svc (void); + + // Shut down the thread pool and it's associated subtasks + int close (u_long flags = 0); + + // Wait for the pool and subtasks to close + int wait(void); + +protected: + ACE_Barrier * barrier_; + Task * next_; + MLD; +}; + +<HR WIDTH="50%"> + +#include "task.h" +#include "block.h" +#include "work.h" + +/* + Construct the Task with zero or more subtasks. If subtasks are requested, + we assign our next_ pointer to the first of those and let it worry about + any remaining subtasks. + */ +Task::Task (int sub_tasks) +: barrier_ (0) + ,next_ (0) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task ctor 0x%x\n", (void *) this)); + if (sub_tasks) + { + next_ = new Task (--sub_tasks); + } +} + +/* + Delete our barrier object and any subtasks we may have. + */ +Task::~Task (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task dtor 0x%x\n", (void *) this)); + + delete barrier_; + delete next_; +} + +/* + Open our thread pool with the requested number of threads. If subtasks are + enabled, they inherit the thread-pool size. Make sure that the subtasks can + be opened before we open our own threadpool. + */ +int Task::open (int threads) +{ + if (next_) + { + if (next_->open (threads) == -1) + { + return -1; + } + } + + barrier_ = new ACE_Barrier (threads); + return this->activate (THR_NEW_LWP, threads); +} + +/* + Close ourselves and any subtasks. This just prints a message so that we can + assure ourselves things are cleaned up correctly. + */ +int Task::close (u_long flags) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task close 0x%x\n", (void *) this)); + if (next_) + { + next_->close (flags); + } + + return (0); +} + +/* + Wait for all of the threads in our pool to exit and then wait for any + subtasks. When called from the front of the task chain, this won't return + until all thread pools in the chain have exited. + */ +int Task::wait (void) +{ + inherited::wait (); + if (next_) + { + next_->wait (); + } + return (0); +} + +/* + Like the thread-pools before, this is where all of the work is done. + */ +int Task::svc (void) +{ + // Wait for all threads to get this far before continuing. + this->barrier_->wait (); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task 0x%x starts in thread %u\n", (void *) this, ACE_Thread::self ())); + + // getq() wants an ACE_Message_Block so we'll start out with one + // of those. We could do some casting (or even auto-casting) to + // avoid the extra variable but I prefer to be clear about our actions. + ACE_Message_Block *message; + + // What we really put into the queue was our Message_Block. + // After we get the message from the queue, we'll cast it to this + // so that we know how to work on it. + Message_Block *message_block; + + // And, of course, our Message_Block contains our Data_Block + // instead of the typical ACE_Data_Block + Data_Block *data_block; + + // Even though we put Work objects into the queue, we take them + // out using the baseclass pointer. This allows us to create new + // derivatives without having to change this svc() method. + Unit_Of_Work *work; + + while (1) + { + // Get the ACE_Message_Block + if (this->getq (message) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "getq"), -1); + } + + // "Convert" it to our Message_Block + message_block = (Message_Block *) message; + + // Get the ACE_Data_Block and "convert" to Data_Block in one step. + data_block = (Data_Block *) (message_block->data_block ()); + + // Get the unit of work from the data block + work = data_block->data (); + + // Show the object's instance value and "type name" + work->who_am_i (); + work->what_am_i (); + + // If there is a hangup we need to tell our pool-peers as + // well as any subtasks. + if (message_block->msg_type () == ACE_Message_Block::MB_HANGUP) + { + // duplicate()ing the message block will increment the + // reference counts on the data blocks. This allows us + // to safely release() the message block. The rule of + // thumb is that if you pass a message block to a new + // owner, duplicate() it. Then you can release() when + // you're done and not worry about memory leaks. + if (this->putq (message_block->duplicate ()) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // If we have a subtask, duplicate() the message block + // again and pass it to that task's queue + if (next_ && next_->putq (message_block->duplicate ()) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // We're now done with our copy of the block, so we can + // release it. Our peers/subtasks have their own message + // block to access the shared data blocks. + message_block->release (); + + break; + } + + // If this isn't a hangup/shutdown message then we tell the + // unit of work to process() for a while. + work->process (); + + if (next_) + { + // If we have subtasks, we pass the block on to them. Notice + // that I don't bother to duplicate() the block since I won't + // release it in this case. I could have invoked + // duplicate() in the puq() and then release() + // afterwards. Either is acceptable. + if (next_->putq (message_block) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + else + { + // If we don't have subtasks then invoke fini() to tell + // the unit of work that we won't be invoking process() + // any more. Then release() the block. This release() + // would not change if we duplicate()ed in the above conditional + work->fini (); + message_block->release (); + } + + // Pretend that the work takes some time... + ACE_OS::sleep (ACE_Time_Value (0, 250)); + } + + return (0); +} +</PRE> +<HR WIDTH="100%"> +<P> +So you see... it wasn't really that much more complicated. We really +just have to remember to pass to <i>next_</i> when we finish working +on the data. If your Unit_Of_Work derivative is going to implement a +state machine be sure that you also implement a fini() method +<em>or</em> ensure that your chain of subtasks is large enough for all +possible states. +<P> +<HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page07.html">Continue +This Tutorial</A>]</CENTER> + +</BODY> +</HTML> diff --git a/docs/tutorials/013/page07.html b/docs/tutorials/013/page07.html new file mode 100644 index 00000000000..f29609074e4 --- /dev/null +++ b/docs/tutorials/013/page07.html @@ -0,0 +1,244 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 013</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER> + + +<P> +<HR WIDTH="100%"> +<P> +I've been trying to justify the chain of tasks by talking about a +Work object that implements a state machine. The idea is that your +Work object has to perform a series of discrete steps to complete it's +function. Traditionally, all of those steps would take place in one +thread of execution. That thread would probably be one from a Task +thread pool. +<P> +Suppose, however, that some of those steps spend a lot of time waiting +for disk IO. You could find that all of your thread-pool threads +are just sitting there waiting for the disk. You might then be +tempted to increase the thread pool size to get more work through. +However, if some of the stages are memory intensive, you could run out +of memory if all of the workers get to that state at the same time. +<P> +One solution might be to have different thread pools for each state. +Each pool could have it's size tuned appropriately for the work that +would be done there. That's where the chain of Tasks comes in. + In this tutorial's implementation I've taken the +easy route and set all of the thread pools to the same size but a more +realistic solution would be to set each thread pool in the chain to a +specific size as needed by that state of operation. +<P> +There's not much to this header either so I've combined it with the +cpp file as with task. +<P> +<HR WIDTH="100%"> +<PRE> +#include "ace/Log_Msg.h" +#include "ace/Synch.h" +#include "mld.h" + +/* + Our specilized message queue and thread pool will know how to do "work" on + our Unit_Of_Work baseclass. + */ +class Unit_Of_Work +{ +public: + Unit_Of_Work (void); + + virtual ~ Unit_Of_Work (void); + + // Display the object instance value + void who_am_i (void); + + // The baseclass can override this to show it's "type name" + virtual void what_am_i (void); + + // This is where you do application level logic. It will be + // called once for each thread pool it passes through. It + // would typically implement a state machine and execute a + // different state on each call. + virtual int process (void); + + // This is called by the last Task in the series (see task.h) + // in case our process() didn't get through all of it's states. + virtual int fini (void); + +protected: + ACE_Atomic_Op < ACE_Mutex, int >state_; + MLD; +}; + +/* + A fairly trivial work derivative that implements an equally trivial state + machine in process() + */ +class Work : public Unit_Of_Work +{ +public: + Work (void); + + Work (int message); + + virtual ~ Work (void); + + void what_am_i (void); + + int process (void); + + int fini (void); + +protected: + int message_; + MLD; +}; + +<HR WIDTH="50%"> + +#include "work.h" + +/* + Initialize the state to zero + */ +Unit_Of_Work::Unit_Of_Work (void) +: state_ (0) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work ctor\n", (void *) this)); +} + +Unit_Of_Work::~Unit_Of_Work (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work dtor\n", (void *) this)); +} + +/* + Display our instance value + */ +void Unit_Of_Work::who_am_i (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work instance\n", (void *) this)); +} + +/* + Dispay our type name + */ +void Unit_Of_Work::what_am_i (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Unit_Of_Work object\n", (void *) this)); +} + +/* + Return failure. You should always derive from Unit_Of_Work... + */ +int Unit_Of_Work::process (void) +{ + return -1; +} + +/* + ditto + */ +int Unit_Of_Work::fini (void) +{ + return -1; +} + +/* + Default constructor has no "message number" + */ +Work::Work (void) +:message_ (-1) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor\n", (void *) this)); +} + +/* + The useful constructor remembers which message it is and will tell you if + you ask. + */ +Work::Work (int message) +: message_ (message) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor for message %d\n", (void *) this, message_)); +} + +Work::~Work (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work dtor\n", (void *) this)); +} + +/* + This objects type name is different from the baseclass + */ +void Work::what_am_i (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Work object for message %d\n", (void *) this, message_)); +} + +/* + A very simple state machine that just walks through three stages. If it is + called more than that, it will tell you not to bother. + */ +int Work::process (void) +{ + switch (++state_) + { + case 1: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage One\n", (void *) this)); + break; + case 2: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Two\n", (void *) this)); + break; + case 3: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Three\n", (void *) this)); + break; + default: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x No work to do in state %d\n", + (void *) this, state_.value ())); + break; + } + return (0); +} + +/* + If you don't have enough subtasks in the chain then the state machine won't + progress to the end. The fini() hook will allow us to recover from that by + executing the remaining states in the final task of the chain. + */ +int Work::fini (void) +{ + while (state_.value () < 3) + { + if (this->process () == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "process"), -1); + } + } + return (0); +} + +</PRE> + +<HR WIDTH="100%"> +<P> +And that is that. For a more complex machine that may want to "jump +states" you would have to set some "state information" (sorry, bad +choice of terminology again) so that process() could decide what to do +at each call. You might also modify Task::svc() so that it will +respect the return value of process() and do something useful with the +information. +<P> +<HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>] [<A HREF="page08.html">Continue +This Tutorial</A>]</CENTER> + +</BODY> +</HTML> diff --git a/docs/tutorials/013/page08.html b/docs/tutorials/013/page08.html new file mode 100644 index 00000000000..d4ceb862180 --- /dev/null +++ b/docs/tutorials/013/page08.html @@ -0,0 +1,45 @@ +<HTML> +<HEAD> + <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1"> + <META NAME="Author" CONTENT="James CE Johnson"> + <TITLE>ACE Tutorial 013</TITLE> +</HEAD> +<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F"> + +<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER> + +<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER> + + +<P> +<HR WIDTH="100%"> +<P> +And that's the end of another tutorial. This one is probably the most +complicated so far because I've introduced or expanded upon +a number of different +concepts. Namely: state machines, reference counting and task +chaining. I hope I didn't complicate things to the point where the +lesson got lost in the noise. As always, feel free to drop a note to +the ACE-Users mailing list if you feel that some of this could use a +little more explaination. + +<P> +<UL> +<LI><A HREF="Makefile">Makefile</A> +<LI><A HREF="block.cpp">block.cpp</A> +<LI><A HREF="block.h">block.h</A> +<LI><A HREF="message_queue">message_queue</A> +<LI><A HREF="message_queue.cpp">message_queue.cpp</A> +<LI><A HREF="mld.cpp">mld.cpp</A> +<LI><A HREF="mld.h">mld.h</A> +<LI><A HREF="task.cpp">task.cpp</A> +<LI><A HREF="task.h">task.h</A> +<LI><A HREF="work.cpp">work.cpp</A> +<LI><A HREF="work.h">work.h</A> +</UL> +<P> +<HR WIDTH="100%"> +<CENTER>[<A HREF="..">Tutorial Index</A>]</CENTER> + +</BODY> +</HTML> diff --git a/docs/tutorials/013/task.cpp b/docs/tutorials/013/task.cpp index 399d0ff5029..6ea62ef7de7 100644 --- a/docs/tutorials/013/task.cpp +++ b/docs/tutorials/013/task.cpp @@ -6,185 +6,188 @@ #include "work.h" /* - Construct the Task with zero or more subtasks. If subtasks are - requested, we assign our next_ pointer to the first of those and let - it worry about any remaining subtasks. + Construct the Task with zero or more subtasks. If subtasks are requested, + we assign our next_ pointer to the first of those and let it worry about + any remaining subtasks. */ -Task::Task ( int sub_tasks ) +Task::Task (int sub_tasks) : barrier_ (0) - ,next_(0) + ,next_ (0) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task ctor 0x%x\n", (void *) this)); - if( sub_tasks ) + if (sub_tasks) { - next_ = new Task( --sub_tasks ); + next_ = new Task (--sub_tasks); } } /* - Delete our barrier object and any subtasks we may have. -*/ + Delete our barrier object and any subtasks we may have. + */ Task::~Task (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task dtor 0x%x\n", (void *) this)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task dtor 0x%x\n", (void *) this)); - delete barrier_; - delete next_; + delete barrier_; + delete next_; } /* - Open our thread pool with the requested number of threads. If - subtasks are enabled, they inherit the thread-pool size. Make sure - that the subtasks can be opened before we open our own threadpool. -*/ -int Task::open (int threads ) + Open our thread pool with the requested number of threads. If subtasks are + enabled, they inherit the thread-pool size. Make sure that the subtasks can + be opened before we open our own threadpool. + */ +int Task::open (int threads) { - if( next_ ) + if (next_) { - if( next_->open(threads) == -1 ) - { - return -1; - } + if (next_->open (threads) == -1) + { + return -1; + } } - + barrier_ = new ACE_Barrier (threads); return this->activate (THR_NEW_LWP, threads); } /* - Close ourselves and any subtasks. This just prints a message so - that we can assure ourselves things are cleaned up correctly. -*/ + Close ourselves and any subtasks. This just prints a message so that we can + assure ourselves things are cleaned up correctly. + */ int Task::close (u_long flags) { ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task close 0x%x\n", (void *) this)); - if( next_ ) + if (next_) { - next_->close(flags); + next_->close (flags); } - - return(0); + + return (0); } /* - Wait for all of the threads in our pool to exit and then wait for - any subtasks. When called from the front of the task chain, this - won't return until all thread pools in the chain have exited. -*/ -int Task::wait(void) + Wait for all of the threads in our pool to exit and then wait for any + subtasks. When called from the front of the task chain, this won't return + until all thread pools in the chain have exited. + */ +int Task::wait (void) { - inherited::wait(); - if( next_ ) - { - next_->wait(); - } - return(0); + inherited::wait (); + if (next_) + { + next_->wait (); + } + return (0); } /* - Like the thread-pools before, this is where all of the work is done. -*/ + Like the thread-pools before, this is where all of the work is done. + */ int Task::svc (void) { - // Wait for all threads to get this far before continuing. - this->barrier_->wait (); - - ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task 0x%x starts in thread %u\n", (void *) this, ACE_Thread::self ())); - - // getq() wants an ACE_Message_Block so we'll start out with one - // of those. We could do some casting (or even auto-casting) to - // avoid the extra variable but I prefer to be clear about our actions. - ACE_Message_Block * message; - // What we really put into the queue was our Message_Block. - // After we get the message from the queue, we'll cast it to this - // so that we know how to work on it. - Message_Block * message_block; - // And, of course, our Message_Block contains our Data_Block - // instead of the typical ACE_Data_Block - Data_Block * data_block; - // Even though we put Work objects into the queue, we take them - // out using the baseclass pointer. This allows us to create new - // derivatives without having to change this svc() method. - Unit_Of_Work * work; - - while (1) + // Wait for all threads to get this far before continuing. + this->barrier_->wait (); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task 0x%x starts in thread %u\n", (void *) this, ACE_Thread::self ())); + + // getq() wants an ACE_Message_Block so we'll start out with one + // of those. We could do some casting (or even auto-casting) to + // avoid the extra variable but I prefer to be clear about our actions. + ACE_Message_Block *message; + + // What we really put into the queue was our Message_Block. + // After we get the message from the queue, we'll cast it to this + // so that we know how to work on it. + Message_Block *message_block; + + // And, of course, our Message_Block contains our Data_Block + // instead of the typical ACE_Data_Block + Data_Block *data_block; + + // Even though we put Work objects into the queue, we take them + // out using the baseclass pointer. This allows us to create new + // derivatives without having to change this svc() method. + Unit_Of_Work *work; + + while (1) + { + // Get the ACE_Message_Block + if (this->getq (message) == -1) { - // Get the ACE_Message_Block - if (this->getq (message) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "getq"), -1); - } - - // "Convert" it to our Message_Block - message_block = (Message_Block*)message; - - // Get the ACE_Data_Block and "convert" to Data_Block in one step. - data_block = (Data_Block*)(message_block->data_block()); - - // Get the unit of work from the data block - work = data_block->data(); - - // Show the object's instance value and "type name" - work->who_am_i(); - work->what_am_i(); - - // If there is a hangup we need to tell our pool-peers as - // well as any subtasks. - if (message_block->msg_type () == ACE_Message_Block::MB_HANGUP) - { - // duplicate()ing the message block will increment the - // reference counts on the data blocks. This allows us - // to safely release() the message block. The rule of - // thumb is that if you pass a message block to a new - // owner, duplicate() it. Then you can release() when - // you're done and not worry about memory leaks. - if( this->putq (message_block->duplicate()) == -1 ) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); - } - - // If we have a subtask, duplicate() the message block - // again and pass it to that task's queue - if( next_ && next_->putq(message_block->duplicate()) == -1 ) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); - } - - // We're now done with our copy of the block, so we can - // release it. Our peers/subtasks have their own message - // block to access the shared data blocks. - message_block->release(); - - break; - } - - // If this isn't a hangup/shutdown message then we tell the - // unit of work to process() for a while. - work->process(); - - if( next_ ) - { - // If we have subtasks, we pass the block on to them. Notice - // that I don't bother to duplicate() the block since I won't - // release it in this case. I could have invoked - // duplicate() in the puq() and then release() - // afterwards. Either is acceptable. - if( next_->putq(message_block) == -1 ) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); - } - else - { - // If we don't have subtasks then invoke fini() to tell - // the unit of work that we won't be invoking process() - // any more. Then release() the block. This release() - // would not change if we duplicate()ed in the above conditional - work->fini(); - message_block->release(); - } - - // Pretend that the work takes some time... - ACE_OS::sleep (ACE_Time_Value (0, 250)); + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "getq"), -1); } - return (0); + // "Convert" it to our Message_Block + message_block = (Message_Block *) message; + + // Get the ACE_Data_Block and "convert" to Data_Block in one step. + data_block = (Data_Block *) (message_block->data_block ()); + + // Get the unit of work from the data block + work = data_block->data (); + + // Show the object's instance value and "type name" + work->who_am_i (); + work->what_am_i (); + + // If there is a hangup we need to tell our pool-peers as + // well as any subtasks. + if (message_block->msg_type () == ACE_Message_Block::MB_HANGUP) + { + // duplicate()ing the message block will increment the + // reference counts on the data blocks. This allows us + // to safely release() the message block. The rule of + // thumb is that if you pass a message block to a new + // owner, duplicate() it. Then you can release() when + // you're done and not worry about memory leaks. + if (this->putq (message_block->duplicate ()) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // If we have a subtask, duplicate() the message block + // again and pass it to that task's queue + if (next_ && next_->putq (message_block->duplicate ()) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + + // We're now done with our copy of the block, so we can + // release it. Our peers/subtasks have their own message + // block to access the shared data blocks. + message_block->release (); + + break; + } + + // If this isn't a hangup/shutdown message then we tell the + // unit of work to process() for a while. + work->process (); + + if (next_) + { + // If we have subtasks, we pass the block on to them. Notice + // that I don't bother to duplicate() the block since I won't + // release it in this case. I could have invoked + // duplicate() in the puq() and then release() + // afterwards. Either is acceptable. + if (next_->putq (message_block) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "putq"), -1); + } + else + { + // If we don't have subtasks then invoke fini() to tell + // the unit of work that we won't be invoking process() + // any more. Then release() the block. This release() + // would not change if we duplicate()ed in the above conditional + work->fini (); + message_block->release (); + } + + // Pretend that the work takes some time... + ACE_OS::sleep (ACE_Time_Value (0, 250)); + } + + return (0); } diff --git a/docs/tutorials/013/task.h b/docs/tutorials/013/task.h index 688b739a765..0c813641169 100644 --- a/docs/tutorials/013/task.h +++ b/docs/tutorials/013/task.h @@ -8,40 +8,40 @@ #include "mld.h" /* - This is much like the Task we've used in the past for implementing a - thread pool. This time, however, I've made the Task an element in a - singly-linked list. As the svc() method finishes the process() on a - unit of work, it will enqueue that unit of work to the next_ Task if - there is one. If the Task does not have a next_ Task, it will - invoke the unit of work object's fini() method after invoking process(). + This is much like the Task we've used in the past for implementing a thread + pool. This time, however, I've made the Task an element in a singly-linked + list. As the svc() method finishes the process() on a unit of work, it + will enqueue that unit of work to the next_ Task if there is one. If the + Task does not have a next_ Task, it will invoke the unit of work object's + fini() method after invoking process(). */ class Task : public ACE_Task < ACE_MT_SYNCH > { public: - typedef ACE_Task < ACE_MT_SYNCH > inherited; + typedef ACE_Task < ACE_MT_SYNCH > inherited; - // Construct ourselves and an optional number of subtasks - // chained beyond us. - Task ( int sub_tasks = 0 ); - ~Task (void); + // Construct ourselves and an optional number of subtasks + // chained beyond us. + Task (int sub_tasks = 0); + ~Task (void); - // Open the Task with the proper thread-pool size - int open (int threads = 1 ); + // Open the Task with the proper thread-pool size + int open (int threads = 1); - // Take Unit_Of_Work objects from the thread pool and invoke - // their process() and/or fini() as appropriate. - int svc (void); + // Take Unit_Of_Work objects from the thread pool and invoke + // their process() and/or fini() as appropriate. + int svc (void); - // Shut down the thread pool and it's associated subtasks - int close (u_long flags = 0); + // Shut down the thread pool and it's associated subtasks + int close (u_long flags = 0); + + // Wait for the pool and subtasks to close + int wait (void); - // Wait for the pool and subtasks to close - int wait(void); - protected: ACE_Barrier * barrier_; - Task * next_; + Task *next_; MLD; }; diff --git a/docs/tutorials/013/work.cpp b/docs/tutorials/013/work.cpp index f4f299c59a3..332353052a3 100644 --- a/docs/tutorials/013/work.cpp +++ b/docs/tutorials/013/work.cpp @@ -4,122 +4,121 @@ #include "work.h" /* - Initialize the state to zero -*/ + Initialize the state to zero + */ Unit_Of_Work::Unit_Of_Work (void) - : state_(0) +: state_ (0) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work ctor\n", (void *) this)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work ctor\n", (void *) this)); } Unit_Of_Work::~Unit_Of_Work (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work dtor\n", (void *) this )); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work dtor\n", (void *) this)); } /* - Display our instance value -*/ + Display our instance value + */ void Unit_Of_Work::who_am_i (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work instance\n", (void *) this)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work instance\n", (void *) this)); } /* - Dispay our type name -*/ + Dispay our type name + */ void Unit_Of_Work::what_am_i (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Unit_Of_Work object\n", (void*)this)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Unit_Of_Work object\n", (void *) this)); } /* - Return failure. You should always derive from Unit_Of_Work... -*/ -int Unit_Of_Work::process(void) + Return failure. You should always derive from Unit_Of_Work... + */ +int Unit_Of_Work::process (void) { - return -1; + return -1; } /* - ditto -*/ -int Unit_Of_Work::fini(void) + ditto + */ +int Unit_Of_Work::fini (void) { - return -1; + return -1; } /* - Default constructor has no "message number" -*/ + Default constructor has no "message number" + */ Work::Work (void) - : message_ (-1) +:message_ (-1) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor\n", (void *) this)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor\n", (void *) this)); } /* - The useful constructor remembers which message it is and will tell - you if you ask. -*/ + The useful constructor remembers which message it is and will tell you if + you ask. + */ Work::Work (int message) - : message_ (message) +: message_ (message) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor for message %d\n", (void *) this, message_)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor for message %d\n", (void *) this, message_)); } Work::~Work (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work dtor\n", (void *) this)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work dtor\n", (void *) this)); } /* - This objects type name is different from the baseclass -*/ + This objects type name is different from the baseclass + */ void Work::what_am_i (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Work object for message %d\n", (void*)this, message_)); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Work object for message %d\n", (void *) this, message_)); } /* - A very simple state machine that just walks through three stages. - If it is called more than that, it will tell you not to bother. -*/ -int Work::process(void) + A very simple state machine that just walks through three stages. If it is + called more than that, it will tell you not to bother. + */ +int Work::process (void) { - switch( ++state_ ) - { - case 1: - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage One\n",(void*)this)); - break; - case 2: - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Two\n",(void*)this)); - break; - case 3: - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Three\n",(void*)this)); - break; - default: - ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x No work to do in state %d\n", - (void*)this, state_.value())); - break; - } - return(0); + switch (++state_) + { + case 1: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage One\n", (void *) this)); + break; + case 2: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Two\n", (void *) this)); + break; + case 3: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Three\n", (void *) this)); + break; + default: + ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x No work to do in state %d\n", + (void *) this, state_.value ())); + break; + } + return (0); } /* - If you don't have enough subtasks in the chain then the state - machine won't progress to the end. The fini() hook will allow us to - recover from that by executing the remaining states in the final - task of the chain. -*/ -int Work::fini(void) + If you don't have enough subtasks in the chain then the state machine won't + progress to the end. The fini() hook will allow us to recover from that by + executing the remaining states in the final task of the chain. + */ +int Work::fini (void) { - while( state_.value() < 3 ) + while (state_.value () < 3) + { + if (this->process () == -1) { - if( this->process() == -1 ) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "process"), -1); - } + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "process"), -1); } - return(0); + } + return (0); } diff --git a/docs/tutorials/013/work.h b/docs/tutorials/013/work.h index 6cdaa42d104..26e10b96266 100644 --- a/docs/tutorials/013/work.h +++ b/docs/tutorials/013/work.h @@ -9,58 +9,58 @@ #include "mld.h" /* - Our specilized message queue and thread pool will know how to do - "work" on our Unit_Of_Work baseclass. -*/ + Our specilized message queue and thread pool will know how to do "work" on + our Unit_Of_Work baseclass. + */ class Unit_Of_Work { public: - Unit_Of_Work (void); - + Unit_Of_Work (void); + virtual ~ Unit_Of_Work (void); - // Display the object instance value - void who_am_i (void); + // Display the object instance value + void who_am_i (void); + + // The baseclass can override this to show it's "type name" + virtual void what_am_i (void); - // The baseclass can override this to show it's "type name" - virtual void what_am_i (void); + // This is where you do application level logic. It will be + // called once for each thread pool it passes through. It + // would typically implement a state machine and execute a + // different state on each call. + virtual int process (void); - // This is where you do application level logic. It will be - // called once for each thread pool it passes through. It - // would typically implement a state machine and execute a - // different state on each call. - virtual int process(void); + // This is called by the last Task in the series (see task.h) + // in case our process() didn't get through all of it's states. + virtual int fini (void); - // This is called by the last Task in the series (see task.h) - // in case our process() didn't get through all of it's states. - virtual int fini(void); - protected: - ACE_Atomic_Op<ACE_Mutex,int> state_; + ACE_Atomic_Op < ACE_Mutex, int >state_; MLD; }; /* - A fairly trivial work derivative that implements an equally trivial - state machine in process() -*/ + A fairly trivial work derivative that implements an equally trivial state + machine in process() + */ class Work : public Unit_Of_Work { public: - Work (void); - + Work (void); + Work (int message); - + virtual ~ Work (void); - - void what_am_i (void); - - int process(void); - - int fini(void); + + void what_am_i (void); + + int process (void); + + int fini (void); protected: - int message_; + int message_; MLD; }; |