summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1998-07-17 22:01:36 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1998-07-17 22:01:36 +0000
commitfff0c99afb5df8dd3d742ad901c366658c6037a0 (patch)
tree1d50392a87581862fd903001844132830291092a
parent9a36bc143fc14cc03f14d4e83d829288588b55be (diff)
downloadATCD-fff0c99afb5df8dd3d742ad901c366658c6037a0.tar.gz
*** empty log message ***
-rw-r--r--examples/Threads/barrier2.cpp177
1 files changed, 108 insertions, 69 deletions
diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp
index 940988cf208..ca3a6890f8e 100644
--- a/examples/Threads/barrier2.cpp
+++ b/examples/Threads/barrier2.cpp
@@ -1,17 +1,14 @@
// $Id$
-// generic_worker_task.cpp
-//
// This test program illustrates how the ACE task workers/barrier
// synchronization mechanisms work in conjunction with the ACE_Task
-// and the ACE_Thread_Manager. The manual flag not set simulates
-// user input, if set input comes from stdin until RETURN only is
-// entered which stops all workers via a message block of length
-// 0. This is an alernative shutdown of workers compared to queue
-// deactivate. The delay_put flag simulates a delay between the
-// shutdown puts. All should work with this flag disabled! The
-// BARRIER_TYPE is supposed to enable/disable barrier sync on each svc
-// a worker has done.
+// and the ACE_Thread_Manager. The manual flag not set simulates user
+// input, if set input comes from stdin until RETURN only is entered
+// which stops all workers via a message block of length 0. This is an
+// alernative shutdown of workers compared to queue deactivate. The
+// delay_put flag simulates a delay between the shutdown puts. All
+// should work with this flag disabled! The BARRIER_TYPE is supposed
+// to enable/disable barrier sync on each svc a worker has done.
#include "ace/Task.h"
#include "ace/Service_Config.h"
@@ -19,20 +16,16 @@
#if defined (ACE_HAS_THREADS)
#define BARRIER_TYPE ACE_Null_Barrier
-//#define BARRIER_TYPE ACE_Barrier
-//#ifdef delay_put
-//#define manual
template <class BARRIER>
class Worker_Task : public ACE_Task<ACE_MT_SYNCH>
{
public:
-
Worker_Task (ACE_Thread_Manager *thr_mgr,
int n_threads,
int inp_serialize = 1);
- virtual int Producer (void);
+ virtual int producer (void);
// produce input for workers
virtual int input (ACE_Message_Block *mb);
@@ -54,10 +47,16 @@ private:
// = Not needed for this test.
virtual int open (void *) { return 0; }
- virtual int close (u_long) {ACE_DEBUG ((LM_DEBUG,"(%t) in close of worker\n")); return 0; }
+ virtual int close (u_long)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in close of worker\n"));
+ return 0;
+ }
int nt_;
// Number of worker threads to run.
+
int inp_serialize_;
BARRIER barrier_;
@@ -71,6 +70,7 @@ Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr,
barrier_ (n_threads)
{
nt_ = n_threads;
+
// Create worker threads.
inp_serialize_ = inp_serialize;
@@ -79,15 +79,19 @@ Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr,
if (nt_ > 0 && inp_serialize == 1)
if (this->activate (THR_NEW_LWP, n_threads) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "activate failed"));
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "activate failed"));
}
// Simply enqueue the Message_Block into the end of the queue.
template <class BARRIER> int
-Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+Worker_Task<BARRIER>::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
{
int result;
+
if (this->inp_serialize_)
result = this->putq (mb, tv);
else
@@ -96,7 +100,8 @@ Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
result = this->service (mb, iter++);
if (this->output (mb) < 0)
- ACE_DEBUG ((LM_DEBUG, "(%t) output not connected!\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) output not connected!\n"));
mb->release ();
}
@@ -104,15 +109,22 @@ Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
}
template <class BARRIER> int
-Worker_Task<BARRIER>::service (ACE_Message_Block *mb, int iter)
+Worker_Task<BARRIER>::service (ACE_Message_Block *mb,
+ int iter)
{
int length = mb->length ();
if (length > 0)
{
- ACE_DEBUG ((LM_DEBUG,"(%t) in iteration %d len=%d text got:\n",iter,length));
- ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
- ACE_DEBUG ((LM_DEBUG,"\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in iteration %d len=%d text got:\n",
+ iter,
+ length));
+ ACE_OS::write (ACE_STDOUT,
+ mb->rd_ptr (),
+ length);
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
}
return 0;
}
@@ -123,8 +135,8 @@ Worker_Task<BARRIER>::service (ACE_Message_Block *mb, int iter)
template <class BARRIER> int
Worker_Task<BARRIER>::svc (void)
{
- // Note that the ACE_Task::svc_run () method automatically adds us
- // to the Thread_Manager when the thread begins.
+ // Note that the <ACE_Task::svc_run> method automatically adds us to
+ // the Thread_Manager when the thread begins.
// Keep looping, reading a message out of the queue, until we get a
// message with a length == 0, which signals us to quit.
@@ -138,7 +150,9 @@ Worker_Task<BARRIER>::svc (void)
if (result == -1)
{
ACE_ERROR ((LM_ERROR,
- "(%t) in iteration %d\n", "error waiting for message in iteration", iter));
+ "(%t) in iteration %d\n",
+ "error waiting for message in iteration",
+ iter));
break;
}
@@ -147,7 +161,9 @@ Worker_Task<BARRIER>::svc (void)
if (length == 0)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d got quit, exit!\n", iter));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in iteration %d got quit, exit!\n",
+ iter));
mb->release ();
break;
}
@@ -158,79 +174,103 @@ Worker_Task<BARRIER>::svc (void)
mb->release ();
}
- // Note that the ACE_Task::svc_run () method automatically removes
- // us from the Thread_Manager when the thread exits.
+ // Note that the <ACE_Task::svc_run> method automatically removes us
+ // from the Thread_Manager when the thread exits.
return 0;
}
template <class BARRIER> int
-Worker_Task<BARRIER>::Producer (void)
+Worker_Task<BARRIER>::producer (void)
{
// Keep reading stdin, until we reach EOF.
for (;;)
{
// Allocate a new message.
- ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
+ ACE_Message_Block *mb;
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ),
+ -1);
if (this->input (mb) == -1)
return -1;
}
- ACE_NOTREACHED(return 0);
+ ACE_NOTREACHED (return 0);
}
-template <class BARRIER>int
+template <class BARRIER> int
Worker_Task<BARRIER>::output (ACE_Message_Block *mb)
{
return this->put_next (mb);
}
-template <class BARRIER>int
+template <class BARRIER> int
Worker_Task<BARRIER>::input (ACE_Message_Block *mb)
{
ACE_Message_Block *mb1;
-#ifndef manual
- static int l= 0;
- char str[]="kalle";
- strcpy (mb->rd_ptr (),str);
- int n=strlen (str);
- if (l==1000)
- n=1;
+#if !defined (manual)
+ static int l = 0;
+ char str[] = "kalle";
+ ACE_OS::strcpy (mb->rd_ptr (), str);
+
+ int n = ACE_OS::strlen (str);
+
+ if (l == 1000)
+ n = 1;
l++;
- if (l==0 || (l%100 == 0)) ACE_OS::sleep (5);
+
+ if (l == 0 || (l % 100 == 0))
+ ACE_OS::sleep (5);
if (n <= 1)
#else
- ACE_DEBUG ((LM_DEBUG,"(%t) press chars and enter to put a new message into task queue ...\n"));
- if ((n = read (0, mb->rd_ptr (), mb->size ())) <= 1)
-#endif // manual
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) press chars and enter to put a new message into task queue ...\n"));
+ n = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
+ if (n <= 1)
+#endif /* manual */
{
// Send a shutdown message to the waiting threads and exit.
// cout << "\nvor loop, dump of task msg queue:\n" << endl;
// this->msg_queue ()->dump ();
- for (int i=0;i<nt_;i++)
+
+ for (int i = 0; i < nt_; i++)
{
- ACE_DEBUG ((LM_DEBUG,"(%t) eof, sending block for thread=%d\n",i+1));
- mb1 = new ACE_Message_Block (2);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) eof, sending block for thread=%d\n",
+ i + 1));
+
+ ACE_NEW_RETURN (mb1,
+ ACE_Message_Block (2),
+ -1);
mb1->length (0);
+
if (this->put (mb1) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put"));
-#ifdef delay_put
- ACE_OS::sleep (1); // this sleep helps to shutdown correctly -> was an error!
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put"));
+#if defined (delay_put)
+ // this sleep helps to shutdown correctly -> was an error!
+ ACE_OS::sleep (1);
#endif /* delay_put */
}
- // cout << "\nnach loop, dump of task msg queue:\n" << endl;
- // this->msg_queue ()->dump ();
- return (-1);
+ return -1;
}
else
{
- // Send a normal message to the waiting threads and continue producing.
+ // Send a normal message to the waiting threads and continue
+ // producing.
mb->wr_ptr (n);
+
if (this->put (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put"));
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "put"));
}
return 0;
}
@@ -240,22 +280,23 @@ main (int argc, char *argv[])
{
int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
- ACE_DEBUG ((LM_DEBUG,"(%t) worker threads running=%d\n",n_threads));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) worker threads running=%d\n",
+ n_threads));
-
- Worker_Task<BARRIER_TYPE> *worker_task =
- new Worker_Task<BARRIER_TYPE> (ACE_Thread_Manager::instance (),
- /*n_threads*/ 0,0);
-
- worker_task->Producer ();
+ Worker_Task<BARRIER_TYPE> worker_task (ACE_Thread_Manager::instance (),
+ /* n_threads */ 0,
+ 0);
+ worker_task->producer ();
// Wait for all the threads to reach their exit point.
- ACE_DEBUG ((LM_DEBUG,"(%t) waiting with thread manager ...\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) waiting with thread manager ...\n"));
+
ACE_Thread_Manager::instance ()->wait ();
- ACE_DEBUG ((LM_DEBUG,"(%t) delete worker task ...\n"));
- delete worker_task;
- ACE_DEBUG ((LM_DEBUG,"(%t) done correct!\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) done correct!\n"));
return 0;
}
@@ -264,8 +305,6 @@ template class Worker_Task<ACE_Null_Barrier>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate Worker_Task<ACE_Null_Barrier>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
-
#else
int
main (int, char *[])