diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2001-12-24 20:51:06 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2001-12-24 20:51:06 +0000 |
commit | 54d6d5e3665b542c6c7838b54b06d6c9dae70a63 (patch) | |
tree | 634bcff1db7a7bc9840352f115f66114371477cc /docs | |
parent | 4a1e3836246b1d5a694d5f528ebc81818ca669ec (diff) | |
download | ATCD-54d6d5e3665b542c6c7838b54b06d6c9dae70a63.tar.gz |
ChangeLogTag:Mon Dec 24 08:08:40 2001 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu>
Diffstat (limited to 'docs')
-rw-r--r-- | docs/tutorials/014/EndTask.h | 23 | ||||
-rw-r--r-- | docs/tutorials/014/Task.cpp | 149 | ||||
-rw-r--r-- | docs/tutorials/014/Task.h | 7 | ||||
-rw-r--r-- | docs/tutorials/014/page02.html | 7 | ||||
-rw-r--r-- | docs/tutorials/014/page03.html | 145 | ||||
-rw-r--r-- | docs/tutorials/014/page04.html | 19 | ||||
-rw-r--r-- | docs/tutorials/014/page05.html | 84 | ||||
-rw-r--r-- | docs/tutorials/014/stream.cpp | 79 |
8 files changed, 277 insertions, 236 deletions
diff --git a/docs/tutorials/014/EndTask.h b/docs/tutorials/014/EndTask.h index a8bab2698b5..3ee76f1672c 100644 --- a/docs/tutorials/014/EndTask.h +++ b/docs/tutorials/014/EndTask.h @@ -1,12 +1,8 @@ // $Id$ -// EndTask.h -// // Tutorial regarding a way to use ACE_Stream. // // written by bob mcwhirter (bob@netwrench.com) -// -// #ifndef ENDTASK_H #define ENDTASK_H @@ -21,7 +17,7 @@ // your first module, and a Stream_Tail behind your // last module. // -// If your put() a message to the Stream Tail, it +// If your put () a message to the Stream Tail, it // assumes you did so in error. This simple EndTask // class allows you to push a message to it and just // have it safely Go Away. @@ -36,11 +32,11 @@ public: EndTask (const char *nameOfTask): inherited (nameOfTask, 0) { - // when we get open()'d, it with 0 threads since there is actually + // when we get open ()'d, it with 0 threads since there is actually // no processing to do. ACE_DEBUG ((LM_INFO, - "(%P|%t) Line: %d, File: %s\n", + " (%P|%t) Line: %d, File: %s\n", __LINE__, __FILE__)); } @@ -48,7 +44,7 @@ public: virtual int open (void *) { ACE_DEBUG ((LM_INFO, - "(%P|%t) Line: %d, File: %s\n", + " (%P|%t) Line: %d, File: %s\n", __LINE__, __FILE__)); return 0; @@ -57,13 +53,13 @@ public: virtual int open (void) { ACE_DEBUG ((LM_INFO, - "(%P|%t) Line: %d, File: %s\n", + " (%P|%t) Line: %d, File: %s\n", __LINE__, __FILE__)); return 0; } - virtual ~EndTask(void) + virtual ~EndTask (void) { } @@ -71,19 +67,18 @@ public: ACE_Time_Value *timeout) { ACE_DEBUG ((LM_INFO, - "(%P|%t) Line: %d, File: %s\n", + " (%P|%t) Line: %d, File: %s\n", __LINE__, __FILE__)); ACE_UNUSED_ARG (timeout); - // we don't have anything to do, so release() the message. + // we don't have anything to do, so release () the message. ACE_DEBUG ((LM_DEBUG, - "(%P|%t) %s EndTask::put() -- releasing Message_Block\n", + " (%P|%t) %s EndTask::put () -- releasing Message_Block\n", this->nameOfTask ())); message->release (); return 0; } - }; #endif /* ENDTASK_H */ diff --git a/docs/tutorials/014/Task.cpp b/docs/tutorials/014/Task.cpp index f1f3d6fdc33..6732bfa622b 100644 --- a/docs/tutorials/014/Task.cpp +++ b/docs/tutorials/014/Task.cpp @@ -1,167 +1,158 @@ - // $Id$ -// Task.cxx -// // Tutorial regarding a way to use ACE_Stream. // // written by bob mcwhirter (bob@netwrench.com) -// -// #include <ace/Message_Block.h> #include "Task.h" -Task::Task(const char * nameOfTask, - int numberOfThreads) - : d_numberOfThreads(numberOfThreads), - d_barrier(numberOfThreads) +Task::Task (const char * nameOfTask, + int numberOfThreads) + : d_numberOfThreads (numberOfThreads), + d_barrier (numberOfThreads) { // Just initialize our name, number of threads, and barrier. - ACE_OS::strcpy(d_nameOfTask, nameOfTask); + ACE_OS::strcpy (d_nameOfTask, nameOfTask); } -Task::~Task(void) +Task::~Task (void) { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::~Task () -- once per Task\n", d_nameOfTask)); } -int Task::open(void *arg) +int Task::open (void *arg) { - ACE_UNUSED_ARG(arg); + ACE_UNUSED_ARG (arg); - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::open () -- once per Task\n", d_nameOfTask)); - // call ACE_Task::activate() to spawn the threads using - // our Task::svc() as the function to be run. + // call ACE_Task::activate () to spawn the threads using + // our Task::svc () as the function to be run. - // No need to use THR_DETACHED here, we're going to wait() + // No need to use THR_DETACHED here, we're going to wait () // for the threads to exit later. No leaks. - return this->activate(THR_NEW_LWP, d_numberOfThreads); + return this->activate (THR_NEW_LWP, d_numberOfThreads); } -int Task::put(ACE_Message_Block *message, +int Task::put (ACE_Message_Block *message, ACE_Time_Value *timeout) { - // ACE_Stream uses the put() method of Tasks to send messages. - // This defaultly does nothing. Here we link our put() method - // directly to our putq() method, so that Messages put() to us + // ACE_Stream uses the put () method of Tasks to send messages. + // This defaultly does nothing. Here we link our put () method + // directly to our putq () method, so that Messages put () to us // will appear in the Message_Queue that is checked by the // service threads. - return this->putq(message, timeout); + return this->putq (message, timeout); } -int Task::close(u_long flags) +int Task::close (u_long flags) { - - // When the Stream closes the Module, the Module then close()'s the Task + // When the Stream closes the Module, the Module then close ()'s the Task // and passing a value of (1) as the flag. - // When a service thread exits, it calls close() with a value that is not + // When a service thread exits, it calls close () with a value that is not // (1). // We use this fact to tell the difference between closing a service thread, // and closing the main Task itself. if (flags == 1) { - // The Module has asked to close the main Task. - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::close () -- flags == 1 -- once per Task\n", d_nameOfTask)); - // We create a Message_Block... + // We create a Message_Block of type MB_HANGUP. - ACE_Message_Block *hangupBlock = new ACE_Message_Block(); + ACE_Message_Block *hangupBlock; - // And make it of the type MB_HANGUP. - - hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP); + // Note the use of the lock_adapter () to ensure proper serialization. + ACE_NEW_RETURN (hangupBlock, + ACE_Message_Block (0, + ACE_Message_Block::MB_HANGUP, + 0, + 0, + 0, + Task::lock_adapter ()), + -1); // We then send this Block into the Message_Queue to be seen by the // service threads. - // Once again we duplicate() the Block as send it off... + // Once again we duplicate () the Block as send it off... - if (this->putq(hangupBlock->duplicate()) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1); + if (this->putq (hangupBlock->duplicate ()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close () putq"), -1); } - // ..and we're free to release() our copy of it. + // ..and we're free to release () our copy of it. - hangupBlock->release(); + hangupBlock->release (); - // Now, all we have to do is wait() for the service threads to all - // exit. This is where using THR_DETACHED in the activate() method + // Now, all we have to do is wait () for the service threads to all + // exit. This is where using THR_DETACHED in the activate () method // will come back to haunt you. // The Stream waits until this returns before attempting to remove // the next Module/Task group in the Stream. This allows for an // orderly shutting down of the Stream. - return this->wait(); - - + return this->wait (); } else { - - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::close () -- flags != 1 -- once per servicing thread\n", d_nameOfTask)); // This is where we can clean up any mess left over by each service thread. // In this Task, there is nothing to do. - } - return 0; - } -int Task::svc(void) +int Task::svc (void) { - // This is the function that our service threads run once they are spawned. - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- once per servicing thread\n", d_nameOfTask)); // First, we wait until all of our peer service threads have arrived // at this point also. - d_barrier.wait(); + d_barrier.wait (); ACE_Message_Block *messageBlock; while (1) { - // And now we loop almost infinitely. - // getq() will block until a Message_Block is available to be read, + // getq () will block until a Message_Block is available to be read, // or an error occurs. - if ( this->getq(messageBlock, 0) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1); + if ( this->getq (messageBlock, 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () getq"), -1); } - if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) { + if (messageBlock->msg_type () == ACE_Message_Block::MB_HANGUP) { // If the Message_Block is of type MB_HANGUP, then we're being asked // to shut down nicely. - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- HANGUP block received\n", d_nameOfTask)); // So, we duplicate the Block, and put it back into the Message_Queue, // in case there are some more peer service threads still running. - if (this->putq(messageBlock->duplicate()) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1); + if (this->putq (messageBlock->duplicate ()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () putq"), -1); } // We release our copy of the Block. - messageBlock->release(); + messageBlock->release (); // And we break out of the nearly infinitely loop, and - // head towards close() ourselves. + // head towards close () ourselves. break; } @@ -169,11 +160,11 @@ int Task::svc(void) // not informing us to quit, so we're assuming it's a valid // meaningful Block. - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- Normal block received\n", d_nameOfTask)); // We grab the read-pointer from the Block, and display it through a DEBUG statement. - ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() )); + ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- %s\n", d_nameOfTask, messageBlock->rd_ptr () )); // We pretend that this takes to time to process the Block. // If you're on a fast machine, you might have to raise this @@ -185,22 +176,32 @@ int Task::svc(void) // Since we're part of a Stream, we duplicate the Block, and // send it on to the next Task. - if (put_next(messageBlock->duplicate()) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1); + if (put_next (messageBlock->duplicate ()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () put_next"), -1); } // And then we release our copy of it. - messageBlock->release(); - + messageBlock->release (); } - return 0; - } - -const char * Task::nameOfTask(void) const +const char *Task::nameOfTask (void) const { return d_nameOfTask; } + +ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *Task::lock_adapter (void) +{ + return &lock_adapter_; +} + +// Static definition. +ACE_Lock_Adapter<ACE_SYNCH_MUTEX> Task::lock_adapter_; + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Lock_Adapter <ACE_SYNCH_MUTEX>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Lock_Adapter <ACE_SYNCH_MUTEX>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/docs/tutorials/014/Task.h b/docs/tutorials/014/Task.h index ae02abff1b5..765496598e5 100644 --- a/docs/tutorials/014/Task.h +++ b/docs/tutorials/014/Task.h @@ -47,6 +47,9 @@ public: const char *nameOfTask (void) const; // Returns the name of this Task. + static ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *lock_adapter (void); + // Returns a pointer to the lock adapter. + private: int d_numberOfThreads; char d_nameOfTask[64]; @@ -54,6 +57,10 @@ private: ACE_Barrier d_barrier; // Simple Barrier to make sure all of our service threads have // entered their loop before accepting any messages. + + static ACE_Lock_Adapter<ACE_SYNCH_MUTEX> lock_adapter_; + // This Lock_Adapter is used to synchronize operations + // on the ACE_Message_Block objects }; #endif /* TASK_H */ diff --git a/docs/tutorials/014/page02.html b/docs/tutorials/014/page02.html index 7184d845eaa..47126503170 100644 --- a/docs/tutorials/014/page02.html +++ b/docs/tutorials/014/page02.html @@ -69,6 +69,9 @@ public: const char *nameOfTask (void) const; <font color=red>// Returns the name of this Task.</font> + static ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *lock_adapter (void); + <font color=red>// Returns a pointer to the lock adapter.</font> + private: int d_numberOfThreads; char d_nameOfTask[64]; @@ -76,6 +79,10 @@ private: ACE_Barrier d_barrier; <font color=red>// Simple Barrier to make sure all of our service threads have</font> <font color=red>// entered their loop before accepting any messages.</font> + + static ACE_Lock_Adapter<ACE_SYNCH_MUTEX> lock_adapter_; + <font color=red>// This Lock_Adapter is used to synchronize operations</font> + <font color=red>// on the ACE_Message_Block objects</font> }; <font color=blue>#endif</font> <font color=red>/* TASK_H */</font> diff --git a/docs/tutorials/014/page03.html b/docs/tutorials/014/page03.html index c08ce6b05db..03d5a41fe14 100644 --- a/docs/tutorials/014/page03.html +++ b/docs/tutorials/014/page03.html @@ -14,18 +14,18 @@ <P> <HR WIDTH="100%"> <P> -Before we get to main() let's take a look at the Task implementation. +Before we get to main () let's take a look at the Task implementation. While we've overridden several methods, the real work is done in - the close() and svc() methods. + the close () and svc () methods. <P> -Notice how close() figures out if it is being called by the shutdown - of the ACE_Stream or by the exit of svc(). The magic here is +Notice how close () figures out if it is being called by the shutdown + of the ACE_Stream or by the exit of svc (). The magic here is provided by the <i>flags</i> parameter. By handling the stream shutdown in this way, we don't have to do anything strange in - svc(). We also don't end up with extra hangup messages in the + svc (). We also don't end up with extra hangup messages in the queue when the dust all settles down. <P> -Like our other tutorials, svc() looks for a hangup and processes data. +Like our other tutorials, svc () looks for a hangup and processes data. <P> <HR WIDTH="100%"> <PRE> @@ -44,55 +44,57 @@ Like our other tutorials, svc() looks for a hangup and processes data. <font color=blue>#include</font> "<font color=green>Task.h</font>" -<font color=#008888>Task::Task</font>(const char * nameOfTask, - int numberOfThreads) - : d_numberOfThreads(numberOfThreads), - d_barrier(numberOfThreads) +<font color=#008888>Task::Task</font> (const char * nameOfTask, + int numberOfThreads) + : d_numberOfThreads (numberOfThreads), + d_barrier (numberOfThreads) { <font color=red>// Just initialize our name, number of threads, and barrier.</font> - <font color=#008888>ACE_OS::strcpy</font>(d_nameOfTask, nameOfTask); + <font color=#008888>ACE_OS::strcpy</font> (d_nameOfTask, nameOfTask); } -<font color=#008888>Task::~Task</font>(void) +<font color=#008888>Task::~Task</font> (void) { - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::~Task</font>() -- once per Task\n</font>", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font +color=#008888>Task::~Task</font> () -- once per Task\n</font>", d_nameOfTask)); } -int <font color=#008888>Task::open</font>(void *arg) +int <font color=#008888>Task::open</font> (void *arg) { - ACE_UNUSED_ARG(arg); + ACE_UNUSED_ARG (arg); - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::open</font>() -- once per Task\n</font>", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font +color=#008888>Task::open</font> () -- once per Task\n</font>", d_nameOfTask)); - <font color=red>// call <font color=#008888>ACE_Task::activate</font>() to spawn the threads using</font> - <font color=red>// our <font color=#008888>Task::svc</font>() as the function to be run.</font> + <font color=red>// call <font color=#008888>ACE_Task::activate</font> () to spawn the threads using</font> + <font color=red>// our <font color=#008888>Task::svc</font> () as the function to be run.</font> - <font color=red>// No need to use THR_DETACHED here, we're going to wait()</font> + <font color=red>// No need to use THR_DETACHED here, we're going to wait ()</font> <font color=red>// for the threads to exit later. No leaks.</font> - return this->activate(THR_NEW_LWP, d_numberOfThreads); + return this->activate (THR_NEW_LWP, d_numberOfThreads); } -int <font color=#008888>Task::put</font>(ACE_Message_Block *message, +int <font color=#008888>Task::put</font> (ACE_Message_Block *message, ACE_Time_Value *timeout) { - <font color=red>// ACE_Stream uses the put() method of Tasks to send messages.</font> - <font color=red>// This defaultly does nothing. Here we link our put() method</font> - <font color=red>// directly to our putq() method, so that Messages put() to us</font> + <font color=red>// ACE_Stream uses the put () method of Tasks to send messages.</font> + <font color=red>// This defaultly does nothing. Here we link our put () method</font> + <font color=red>// directly to our putq () method, so that Messages put () to us</font> <font color=red>// will appear in the Message_Queue that is checked by the</font> <font color=red>// service threads.</font> - return this->putq(message, timeout); + return this->putq (message, timeout); } -int <font color=#008888>Task::close</font>(u_long flags) +int <font color=#008888>Task::close</font> (u_long flags) { - <font color=red>// When the Stream closes the Module, the Module then close()'s the Task</font> + <font color=red>// When the Stream closes the Module, the Module then close ()'s the Task</font> <font color=red>// and passing a value of (1) as the flag.</font> - <font color=red>// When a service thread exits, it calls close() with a value that is not</font> + <font color=red>// When a service thread exits, it calls close () with a value that is not</font> <font color=red>// (1).</font> <font color=red>// We use this fact to tell the difference between closing a service thread,</font> @@ -102,43 +104,51 @@ int <font color=#008888>Task::close</font>(u_long flags) <font color=red>// The Module has asked to close the main Task.</font> - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::close</font>() -- flags == 1 -- once per Task\n</font>", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font +color=#008888>Task::close</font> () -- flags == 1 -- once per Task\n</font>", d_nameOfTask)); - <font color=red>// We create a Message_Block...</font> + <font color=red>// We create a Message_Block of type MB_HANGUP.</font> - ACE_Message_Block *hangupBlock = new ACE_Message_Block(); + ACE_Message_Block *hangupBlock; - <font color=red>// And make it of the type MB_HANGUP.</font> - - hangupBlock->msg_type(<font color=#008888>ACE_Message_Block::MB_HANGUP</font>); + <font color=red>// Note the use of the lock_adapter () to ensure proper serialization.</font> + ACE_NEW_RETURN (hangupBlock, + ACE_Message_Block (0, + ACE_Message_Block::MB_HANGUP, + 0, + 0, + 0, + Task::lock_adapter ()), + -1); <font color=red>// We then send this Block into the Message_Queue to be seen by the</font> <font color=red>// service threads.</font> - <font color=red>// Once again we duplicate() the Block as send it off...</font> + <font color=red>// Once again we duplicate () the Block as send it off...</font> - if (this->putq(hangupBlock->duplicate()) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::close</font>() putq</font>"), -1); + if (this->putq (hangupBlock->duplicate ()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>","<font color=green><font color=#008888>Task::close</font> () putq</font>"), -1); } - <font color=red>// ..and we're free to release() our copy of it.</font> + <font color=red>// ..and we're free to release () our copy of it.</font> - hangupBlock->release(); + hangupBlock->release (); - <font color=red>// Now, all we have to do is wait() for the service threads to all</font> - <font color=red>// exit. This is where using THR_DETACHED in the activate() method</font> + <font color=red>// Now, all we have to do is wait () for the service threads to all</font> + <font color=red>// exit. This is where using THR_DETACHED in the activate () method</font> <font color=red>// will come back to haunt you.</font> <font color=red>// The Stream waits until this returns before attempting to remove</font> <font color=red>// the next Module/Task group in the Stream. This allows for an</font> <font color=red>// orderly shutting down of the Stream.</font> - return this->wait(); + return this->wait (); } else { - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::close</font>() -- flags != 1 -- once per servicing thread\n</font>", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font +color=#008888>Task::close</font> () -- flags != 1 -- once per servicing thread\n</font>", d_nameOfTask)); <font color=red>// This is where we can clean up any mess left over by each service thread.</font> <font color=red>// In this Task, there is nothing to do.</font> @@ -149,17 +159,18 @@ int <font color=#008888>Task::close</font>(u_long flags) } -int <font color=#008888>Task::svc</font>(void) +int <font color=#008888>Task::svc</font> (void) { <font color=red>// This is the function that our service threads run once they are spawned.</font> - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- once per servicing thread\n</font>", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font +color=#008888>Task::svc</font> () -- once per servicing thread\n</font>", d_nameOfTask)); <font color=red>// First, we wait until all of our peer service threads have arrived</font> <font color=red>// at this point also.</font> - d_barrier.wait(); + d_barrier.wait (); ACE_Message_Block *messageBlock; @@ -167,32 +178,33 @@ int <font color=#008888>Task::svc</font>(void) <font color=red>// And now we loop almost infinitely.</font> - <font color=red>// getq() will block until a Message_Block is available to be read,</font> + <font color=red>// getq () will block until a Message_Block is available to be read,</font> <font color=red>// or an error occurs.</font> - if ( this->getq(messageBlock, 0) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::svc</font>() getq</font>"), -1); + if ( this->getq (messageBlock, 0) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>","<font color=green><font color=#008888>Task::svc</font> () getq</font>"), -1); } - if (messageBlock->msg_type() == <font color=#008888>ACE_Message_Block::MB_HANGUP</font>) { + if (messageBlock->msg_type () == <font color=#008888>ACE_Message_Block::MB_HANGUP</font>) { <font color=red>// If the Message_Block is of type MB_HANGUP, then we're being asked</font> <font color=red>// to shut down nicely.</font> - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- HANGUP block received\n</font>", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font +color=#008888>Task::svc</font> () -- HANGUP block received\n</font>", d_nameOfTask)); <font color=red>// So, we duplicate the Block, and put it back into the Message_Queue,</font> <font color=red>// in case there are some more peer service threads still running.</font> - if (this->putq(messageBlock->duplicate()) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::svc</font>() putq</font>"), -1); + if (this->putq (messageBlock->duplicate ()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>","<font color=green><font color=#008888>Task::svc</font> () putq</font>"), -1); } <font color=red>// We release our copy of the Block.</font> - messageBlock->release(); + messageBlock->release (); <font color=red>// And we break out of the nearly infinitely loop, and</font> - <font color=red>// head towards close() ourselves.</font> + <font color=red>// head towards close () ourselves.</font> break; } @@ -200,11 +212,11 @@ int <font color=#008888>Task::svc</font>(void) <font color=red>// not informing us to quit, so we're assuming it's a valid</font> <font color=red>// meaningful Block.</font> - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- Normal block received\n</font>", d_nameOfTask)); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font color=#008888>Task::svc</font> () -- Normal block received\n</font>", d_nameOfTask)); <font color=red>// We grab the read-pointer from the Block, and display it through a DEBUG statement.</font> - ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- %s\n</font>", d_nameOfTask, messageBlock->rd_ptr() )); + ACE_DEBUG ((LM_DEBUG, "<font color=green> (%P|%t) %s <font color=#008888>Task::svc</font> () -- %s\n</font>", d_nameOfTask, messageBlock->rd_ptr () )); <font color=red>// We pretend that this takes to time to process the Block.</font> <font color=red>// If you're on a fast machine, you might have to raise this</font> @@ -216,13 +228,13 @@ int <font color=#008888>Task::svc</font>(void) <font color=red>// Since we're part of a Stream, we duplicate the Block, and</font> <font color=red>// send it on to the next Task.</font> - if (put_next(messageBlock->duplicate()) == -1) { - ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::svc</font>() put_next</font>"), -1); + if (put_next (messageBlock->duplicate ()) == -1) { + ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>","<font color=green><font color=#008888>Task::svc</font> () put_next</font>"), -1); } <font color=red>// And then we release our copy of it.</font> - messageBlock->release(); + messageBlock->release (); } @@ -230,11 +242,20 @@ int <font color=#008888>Task::svc</font>(void) } - -const char * <font color=#008888>Task::nameOfTask</font>(void) const +const char * <font color=#008888>Task::nameOfTask</font> (void) const { return d_nameOfTask; } + +ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *<font +color=#008888>Task::lock_adapter</font> (void) +{ + return &lock_adapter_; +} + +<font color=red>// Static definition.</font> +ACE_Lock_Adapter<ACE_SYNCH_MUTEX> Task::lock_adapter_; + </PRE> <P><HR WIDTH="100%"> <CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] [<A HREF="page04.html">Continue This Tutorial</A>]</CENTER> diff --git a/docs/tutorials/014/page04.html b/docs/tutorials/014/page04.html index 40bc4e2ff96..5b06801de81 100644 --- a/docs/tutorials/014/page04.html +++ b/docs/tutorials/014/page04.html @@ -48,7 +48,7 @@ Read on... <font color=red>// your first module, and a Stream_Tail behind your</font> <font color=red>// last module.</font> <font color=red>//</font> -<font color=red>// If your put() a message to the Stream Tail, it</font> +<font color=red>// If your put () a message to the Stream Tail, it</font> <font color=red>// assumes you did so in error. This simple EndTask</font> <font color=red>// class allows you to push a message to it and just</font> <font color=red>// have it safely Go Away.</font> @@ -63,11 +63,11 @@ public: EndTask (const char *nameOfTask): inherited (nameOfTask, 0) { - <font color=red>// when we get open()'d, it with 0 threads since there is actually</font> + <font color=red>// when we get open ()'d, it with 0 threads since there is actually</font> <font color=red>// no processing to do.</font> ACE_DEBUG ((LM_INFO, - "<font color=green>(%P|%t) Line: %d, File: %s\n</font>", + "<font color=green> (%P|%t) Line: %d, File: %s\n</font>", __LINE__, __FILE__)); } @@ -75,7 +75,7 @@ public: virtual int open (void *) { ACE_DEBUG ((LM_INFO, - "<font color=green>(%P|%t) Line: %d, File: %s\n</font>", + "<font color=green> (%P|%t) Line: %d, File: %s\n</font>", __LINE__, __FILE__)); return 0; @@ -84,13 +84,13 @@ public: virtual int open (void) { ACE_DEBUG ((LM_INFO, - "<font color=green>(%P|%t) Line: %d, File: %s\n</font>", + "<font color=green> (%P|%t) Line: %d, File: %s\n</font>", __LINE__, __FILE__)); return 0; } - virtual ~EndTask(void) + virtual ~EndTask (void) { } @@ -98,14 +98,15 @@ public: ACE_Time_Value *timeout) { ACE_DEBUG ((LM_INFO, - "<font color=green>(%P|%t) Line: %d, File: %s\n</font>", + "<font color=green> (%P|%t) Line: %d, File: %s\n</font>", __LINE__, __FILE__)); ACE_UNUSED_ARG (timeout); - <font color=red>// we don't have anything to do, so release() the message.</font> + <font color=red>// we don't have anything to do, so release () the message.</font> ACE_DEBUG ((LM_DEBUG, - "<font color=green>(%P|%t) %s <font color=#008888>EndTask::put</font>() -- releasing Message_Block\n</font>", + "<font color=green> (%P|%t) %s <font +color=#008888>EndTask::put</font> () -- releasing Message_Block\n</font>", this->nameOfTask ())); message->release (); return 0; diff --git a/docs/tutorials/014/page05.html b/docs/tutorials/014/page05.html index 3fdb16a0be7..011cebcdd97 100644 --- a/docs/tutorials/014/page05.html +++ b/docs/tutorials/014/page05.html @@ -14,7 +14,7 @@ <P> <HR WIDTH="100%"> <P> -Now we come to main(). In the previous task-chain tutorial +Now we come to main (). In the previous task-chain tutorial every thread pool had to have the same number of threads. This time around, we leverage the construction method of ACE_Stream and ACE_Module to customize the thread-pool size in each @@ -25,14 +25,14 @@ Remember EndTask from the previous page? We create one here and push Technically, we could have replaced the default Tail task created by the ACE framework but it seems to make more sense to just push our "tail" onto the stream like the other tasks. The - caveat to this method is that you must be sure you don't push() + caveat to this method is that you must be sure you don't push () any other Modules behind the EndTask! <P> Once the stream of modules containing tasks is all setup then we can - put() some data into the stream for processing. The clever use - of Task::close() makes shutting downt the stream easier than + put () some data into the stream for processing. The clever use + of Task::close () makes shutting downt the stream easier than ever. No messing with hangup messages at the application level, - just close() when you're done! What could be simpler? + just close () when you're done! What could be simpler? <P> <HR WIDTH="100%"> <PRE> @@ -62,9 +62,10 @@ typedef ACE_Stream<ACE_MT_SYNCH> Stream; <font color=red>// Just to avoid a lot of typing, typedefs</font> <font color=red>// are generally a good idea.</font> -int main(int argc, char *argv[]) +int main (int argc, char *argv[]) { - int numberOfMessages = argc > 1 ? <font color=#008888>ACE_OS::atoi</font>(argv[1]) : 3; + int numberOfMessages = argc > 1 ? <font +color=#008888>ACE_OS::atoi</font> (argv[1]) : 3; <font color=red>// unless otherwise specified, just send three messages</font> <font color=red>// down the stream.</font> @@ -87,15 +88,15 @@ int main(int argc, char *argv[]) <font color=red>// Out Task's take two arguments: a name, and the number</font> <font color=red>// of threads to dedicate to the task.</font> - taskOne = new Task("<font color=green>Task No. 1</font>", 1); - taskTwo = new Task("<font color=green>Task No. 2</font>", 3); - taskThree = new Task("<font color=green>Task No. 3</font>", 7); - taskFour = new Task("<font color=green>Task No. 4</font>", 1); + taskOne = new Task ("<font color=green>Task No. 1</font>", 1); + taskTwo = new Task ("<font color=green>Task No. 2</font>", 3); + taskThree = new Task ("<font color=green>Task No. 3</font>", 7); + taskFour = new Task ("<font color=green>Task No. 4</font>", 1); <font color=red>// Our EndTask only takes 1 argument, as it actually</font> <font color=red>// doesn't spawn any threads for processing.</font> - taskEnd = new EndTask("<font color=green>End Task</font>"); + taskEnd = new EndTask ("<font color=green>End Task</font>"); Module *moduleOne; Module *moduleTwo; @@ -112,49 +113,49 @@ int main(int argc, char *argv[]) <font color=red>// so we'll only actually install a Task into the write</font> <font color=red>// side of the module, effectively downstream.</font> - moduleOne = new Module("<font color=green>Module No. 1</font>", taskOne); - moduleTwo = new Module("<font color=green>Module No. 2</font>", taskTwo); - moduleThree = new Module("<font color=green>Module No. 3</font>", taskThree); - moduleFour = new Module("<font color=green>Module No. 4</font>", taskFour); - moduleEnd = new Module("<font color=green>Module End</font>", taskEnd); + moduleOne = new Module ("<font color=green>Module No. 1</font>", taskOne); + moduleTwo = new Module ("<font color=green>Module No. 2</font>", taskTwo); + moduleThree = new Module ("<font color=green>Module No. 3</font>", taskThree); + moduleFour = new Module ("<font color=green>Module No. 4</font>", taskFour); + moduleEnd = new Module ("<font color=green>Module End</font>", taskEnd); <font color=red>// Now we push the Modules onto the Stream.</font> <font color=red>// Pushing adds the module to the head, or</font> <font color=red>// otherwise prepends it to whatever modules</font> <font color=red>// are already installed.</font> - <font color=red>// So, you need to push() the modules on -backwards-</font> + <font color=red>// So, you need to push () the modules on -backwards-</font> <font color=red>// from our viewpoint.</font> - if (theStream.push(moduleEnd) == -1) { + if (theStream.push (moduleEnd) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1); } - if (theStream.push(moduleFour) == -1) { + if (theStream.push (moduleFour) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1); } <font color=red>// As we push a Module onto the Stream, it gets opened.</font> - <font color=red>// When a Module open()s, it opens the Tasks that it contains.</font> + <font color=red>// When a Module open ()s, it opens the Tasks that it contains.</font> <font color=red>//</font> <font color=red>// Since we cannot provide an argument to this embedded</font> - <font color=red>// call to open(), we supplied specified the number of</font> + <font color=red>// call to open (), we supplied specified the number of</font> <font color=red>// threads in the constructor of our Tasks.</font> - if (theStream.push(moduleThree) == -1) { + if (theStream.push (moduleThree) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1); } - if (theStream.push(moduleTwo) == -1) { + if (theStream.push (moduleTwo) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1); } - if (theStream.push(moduleOne) == -1) { + if (theStream.push (moduleOne) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1); } <font color=red>// Now that the Modules are open, the Tasks threads should</font> - <font color=red>// be launching and entering their svc() loop, so we send</font> + <font color=red>// be launching and entering their svc () loop, so we send</font> <font color=red>// some messages down the Stream.</font> int sent = 1; @@ -163,30 +164,37 @@ int main(int argc, char *argv[]) while (sent <= numberOfMessages) { - <font color=red>// First, create ourselves a Message_Block.</font> - <font color=red>// see Tutorials 10-13 for more information</font> - <font color=red>// about Message_Blocks and Message_Queues.</font> - - message = new ACE_Message_Block(128); + <font color=red>// First, create ourselves a Message_Block. See Tutorials 10-13</font> + <font color=red>// for more information about Message_Blocks and Message_Queues.</font> + <font color=red>// Note the use of the lock_adapter () to ensure proper</font> + <font color=red>// serialization.</font> + ACE_NEW_RETURN (message, + ACE_Message_Block (128, + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + Task::lock_adapter ()), + -1); <font color=red>// Now, we grab the write-pointer from the Block,</font> - <font color=red>// and sprintf() our text into it.</font> + <font color=red>// and sprintf () our text into it.</font> - <font color=#008888>ACE_OS::sprintf</font>(message->wr_ptr(), "<font color=green>Message No. %d</font>", sent); + <font color=#008888>ACE_OS::sprintf</font> (message->wr_ptr (), "<font color=green>Message No. %d</font>", sent); <font color=red>// All we have to do now is drop the Message_Block</font> <font color=red>// into the Stream.</font> - <font color=red>// It is always a good idea to duplicate() a Message_Block</font> + <font color=red>// It is always a good idea to duplicate () a Message_Block</font> <font color=red>// when you put it into any Message_Queue, as then</font> - <font color=red>// you can always be allowed to release() your copy</font> + <font color=red>// you can always be allowed to release () your copy</font> <font color=red>// without worry.</font> - if (theStream.put(message->duplicate(), 0) == -1) { + if (theStream.put (message->duplicate (), 0) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>put</font>"), -1); } - message->release(); + message->release (); ++sent; } @@ -200,7 +208,7 @@ int main(int argc, char *argv[]) <font color=red>// Task class) until all Message_Blocks have cleared the</font> <font color=red>// entire Stream, and all associated threads have exited.</font> - theStream.close(); + theStream.close (); return 0; } diff --git a/docs/tutorials/014/stream.cpp b/docs/tutorials/014/stream.cpp index 0e2b784b2f8..86505604822 100644 --- a/docs/tutorials/014/stream.cpp +++ b/docs/tutorials/014/stream.cpp @@ -1,13 +1,8 @@ - // $Id$ -// stream.cxx -// // Tutorial regarding a way to use ACE_Stream. // // written by bob mcwhirter (bob@netwrench.com) -// -// #include "Task.h" #include "EndTask.h" @@ -18,15 +13,14 @@ #include <ace/streams.h> // These are the neccessary ACE headers. - typedef ACE_Module<ACE_MT_SYNCH> Module; typedef ACE_Stream<ACE_MT_SYNCH> Stream; // Just to avoid a lot of typing, typedefs // are generally a good idea. -int main(int argc, char *argv[]) +int main (int argc, char *argv[]) { - int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3; + int numberOfMessages = argc > 1 ? ACE_OS::atoi (argv[1]) : 3; // unless otherwise specified, just send three messages // down the stream. @@ -49,15 +43,15 @@ int main(int argc, char *argv[]) // Out Task's take two arguments: a name, and the number // of threads to dedicate to the task. - taskOne = new Task("Task No. 1", 1); - taskTwo = new Task("Task No. 2", 3); - taskThree = new Task("Task No. 3", 7); - taskFour = new Task("Task No. 4", 1); + taskOne = new Task ("Task No. 1", 1); + taskTwo = new Task ("Task No. 2", 3); + taskThree = new Task ("Task No. 3", 7); + taskFour = new Task ("Task No. 4", 1); // Our EndTask only takes 1 argument, as it actually // doesn't spawn any threads for processing. - taskEnd = new EndTask("End Task"); + taskEnd = new EndTask ("End Task"); Module *moduleOne; Module *moduleTwo; @@ -74,49 +68,49 @@ int main(int argc, char *argv[]) // so we'll only actually install a Task into the write // side of the module, effectively downstream. - moduleOne = new Module("Module No. 1", taskOne); - moduleTwo = new Module("Module No. 2", taskTwo); - moduleThree = new Module("Module No. 3", taskThree); - moduleFour = new Module("Module No. 4", taskFour); - moduleEnd = new Module("Module End", taskEnd); + moduleOne = new Module ("Module No. 1", taskOne); + moduleTwo = new Module ("Module No. 2", taskTwo); + moduleThree = new Module ("Module No. 3", taskThree); + moduleFour = new Module ("Module No. 4", taskFour); + moduleEnd = new Module ("Module End", taskEnd); // Now we push the Modules onto the Stream. // Pushing adds the module to the head, or // otherwise prepends it to whatever modules // are already installed. - // So, you need to push() the modules on -backwards- + // So, you need to push () the modules on -backwards- // from our viewpoint. - if (theStream.push(moduleEnd) == -1) { + if (theStream.push (moduleEnd) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); } - if (theStream.push(moduleFour) == -1) { + if (theStream.push (moduleFour) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); } // As we push a Module onto the Stream, it gets opened. - // When a Module open()s, it opens the Tasks that it contains. + // When a Module open ()s, it opens the Tasks that it contains. // // Since we cannot provide an argument to this embedded - // call to open(), we supplied specified the number of + // call to open (), we supplied specified the number of // threads in the constructor of our Tasks. - if (theStream.push(moduleThree) == -1) { + if (theStream.push (moduleThree) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); } - if (theStream.push(moduleTwo) == -1) { + if (theStream.push (moduleTwo) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); } - if (theStream.push(moduleOne) == -1) { + if (theStream.push (moduleOne) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1); } // Now that the Modules are open, the Tasks threads should - // be launching and entering their svc() loop, so we send + // be launching and entering their svc () loop, so we send // some messages down the Stream. int sent = 1; @@ -125,30 +119,37 @@ int main(int argc, char *argv[]) while (sent <= numberOfMessages) { - // First, create ourselves a Message_Block. - // see Tutorials 10-13 for more information - // about Message_Blocks and Message_Queues. - - message = new ACE_Message_Block(128); + // First, create ourselves a Message_Block. See Tutorials 10-13 + // for more information about Message_Blocks and Message_Queues. + // Note the use of the lock_adapter () to ensure proper + // serialization. + ACE_NEW_RETURN (message, + ACE_Message_Block (128, + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + Task::lock_adapter ()), + -1); // Now, we grab the write-pointer from the Block, - // and sprintf() our text into it. + // and sprintf () our text into it. - ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent); + ACE_OS::sprintf (message->wr_ptr (), "Message No. %d", sent); // All we have to do now is drop the Message_Block // into the Stream. - // It is always a good idea to duplicate() a Message_Block + // It is always a good idea to duplicate () a Message_Block // when you put it into any Message_Queue, as then - // you can always be allowed to release() your copy + // you can always be allowed to release () your copy // without worry. - if (theStream.put(message->duplicate(), 0) == -1) { + if (theStream.put (message->duplicate (), 0) == -1) { ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1); } - message->release(); + message->release (); ++sent; } @@ -162,7 +163,7 @@ int main(int argc, char *argv[]) // Task class) until all Message_Blocks have cleared the // entire Stream, and all associated threads have exited. - theStream.close(); + theStream.close (); return 0; } |