summaryrefslogtreecommitdiff
path: root/examples/Threads/barrier2.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/Threads/barrier2.cpp')
-rw-r--r--examples/Threads/barrier2.cpp317
1 files changed, 0 insertions, 317 deletions
diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp
deleted file mode 100644
index c621190ad0b..00000000000
--- a/examples/Threads/barrier2.cpp
+++ /dev/null
@@ -1,317 +0,0 @@
-// $Id$
-
-// This test program illustrates how the ACE task workers/barrier
-// synchronization mechanisms work in conjunction with the ACE_Task
-// and the ACE_Thread_Manager. The manual flag not set simulates user
-// input, if set input comes from stdin until RETURN only is entered
-// which stops all workers via a message block of length 0. This is an
-// alernative shutdown of workers compared to queue deactivate. The
-// delay_put flag simulates a delay between the shutdown puts. All
-// should work with this flag disabled! The BARRIER_TYPE is supposed
-// to enable/disable barrier sync on each svc a worker has done.
-
-#include "ace/Task.h"
-#include "ace/Service_Config.h"
-
-ACE_RCSID(Threads, barrier2, "$Id$")
-
-#if defined (ACE_HAS_THREADS)
-
-#define BARRIER_TYPE ACE_Null_Barrier
-
-template <class BARRIER>
-class Worker_Task : public ACE_Task<ACE_MT_SYNCH>
-{
-public:
- Worker_Task (ACE_Thread_Manager *thr_mgr,
- int n_threads,
- int inp_serialize = 1);
-
- virtual int producer (void);
- // produce input for workers
-
- virtual int input (ACE_Message_Block *mb);
- // Fill one message block via a certain input strategy.
-
- virtual int output (ACE_Message_Block *mb);
- // Forward one message block via a certain output strategy to the
- // next task if any.
-
- virtual int service (ACE_Message_Block *mb, int iter);
- // Perform one message block dependant service.
-
-private:
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0);
-
- virtual int svc (void);
- // Iterate <n_iterations> time printing off a message and "waiting"
- // for all other threads to complete this iteration.
-
- // = Not needed for this test.
- virtual int open (void *) { return 0; }
- virtual int close (u_long)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in close of worker\n"));
- return 0;
- }
-
- int nt_;
- // Number of worker threads to run.
-
- int inp_serialize_;
-
- BARRIER barrier_;
-};
-
-template <class BARRIER>
-Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr,
- int n_threads,
- int inp_serialize)
- : ACE_Task<ACE_MT_SYNCH> (thr_mgr),
- barrier_ (n_threads)
-{
- nt_ = n_threads;
-
- // Create worker threads.
- inp_serialize_ = inp_serialize;
-
- // Use the task's message queue for serialization (default) or run
- // service in the context of the caller thread.
-
- if (nt_ > 0 && inp_serialize == 1)
- if (this->activate (THR_NEW_LWP, n_threads) == -1)
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "activate failed"));
-}
-
-// Simply enqueue the Message_Block into the end of the queue.
-
-template <class BARRIER> int
-Worker_Task<BARRIER>::put (ACE_Message_Block *mb,
- ACE_Time_Value *tv)
-{
- int result;
-
- if (this->inp_serialize_)
- result = this->putq (mb, tv);
- else
- {
- static int iter = 0;
- result = this->service (mb, iter++);
-
- if (this->output (mb) < 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%t) output not connected!\n"));
-
- mb->release ();
- }
- return result;
-}
-
-template <class BARRIER> int
-Worker_Task<BARRIER>::service (ACE_Message_Block *mb,
- int iter)
-{
- int length = mb->length ();
-
- if (length > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in iteration %d len=%d text got:\n",
- iter,
- length));
- ACE_OS::write (ACE_STDOUT,
- mb->rd_ptr (),
- length);
- ACE_DEBUG ((LM_DEBUG,
- "\n"));
- }
- return 0;
-}
-
-// Iterate <n_iterations> time printing off a message and "waiting"
-// for all other threads to complete this iteration.
-
-template <class BARRIER> int
-Worker_Task<BARRIER>::svc (void)
-{
- // Note that the <ACE_Task::svc_run> method automatically adds us to
- // the Thread_Manager when the thread begins.
-
- // Keep looping, reading a message out of the queue, until we get a
- // message with a length == 0, which signals us to quit.
-
- for (int iter = 1; ;iter++)
- {
- ACE_Message_Block *mb = 0;
-
- int result = this->getq (mb);
-
- if (result == -1)
- {
- ACE_ERROR ((LM_ERROR,
- "(%t) in iteration %d\n",
- "error waiting for message in iteration",
- iter));
- break;
- }
-
- int length = mb->length ();
- this->service (mb,iter);
-
- if (length == 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in iteration %d got quit, exit!\n",
- iter));
- mb->release ();
- break;
- }
-
- this->barrier_.wait ();
- this->output (mb);
-
- mb->release ();
- }
-
- // Note that the <ACE_Task::svc_run> method automatically removes us
- // from the Thread_Manager when the thread exits.
-
- return 0;
-}
-
-template <class BARRIER> int
-Worker_Task<BARRIER>::producer (void)
-{
- // Keep reading stdin, until we reach EOF.
-
- for (;;)
- {
- // Allocate a new message.
- ACE_Message_Block *mb;
-
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (BUFSIZ),
- -1);
-
- if (this->input (mb) == -1)
- return -1;
- }
-
- ACE_NOTREACHED (return 0);
-}
-
-template <class BARRIER> int
-Worker_Task<BARRIER>::output (ACE_Message_Block *mb)
-{
- return this->put_next (mb);
-}
-
-template <class BARRIER> int
-Worker_Task<BARRIER>::input (ACE_Message_Block *mb)
-{
- ACE_Message_Block *mb1;
-
-#if !defined (manual)
- static int l = 0;
- char str[] = "kalle";
- ACE_OS::strcpy (mb->rd_ptr (), str);
-
- int n = ACE_OS::strlen (str);
-
- if (l == 1000)
- n = 1;
- l++;
-
- if (l == 0 || (l % 100 == 0))
- ACE_OS::sleep (5);
- if (n <= 1)
-#else
- ACE_DEBUG ((LM_DEBUG,
- "(%t) press chars and enter to put a new message into task queue ...\n"));
- n = ACE_OS::read (ACE_STDIN,
- mb->rd_ptr (),
- mb->size ());
- if (n <= 1)
-#endif /* manual */
- {
- // Send a shutdown message to the waiting threads and exit.
- // cout << "\nvor loop, dump of task msg queue:\n" << endl;
- // this->msg_queue ()->dump ();
-
- for (int i = 0; i < nt_; i++)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) eof, sending block for thread=%d\n",
- i + 1));
-
- ACE_NEW_RETURN (mb1,
- ACE_Message_Block (2),
- -1);
- mb1->length (0);
-
- if (this->put (mb1) == -1)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "put"));
-#if defined (delay_put)
- // this sleep helps to shutdown correctly -> was an error!
- ACE_OS::sleep (1);
-#endif /* delay_put */
- }
- return -1;
- }
- else
- {
- // Send a normal message to the waiting threads and continue
- // producing.
- mb->wr_ptr (n);
-
- if (this->put (mb) == -1)
- ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "put"));
- }
- return 0;
-}
-
-int
-main (int argc, char *argv[])
-{
- int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) worker threads running=%d\n",
- n_threads));
-
- Worker_Task<BARRIER_TYPE> worker_task (ACE_Thread_Manager::instance (),
- /* n_threads */ 0,
- 0);
- worker_task.producer ();
-
- // Wait for all the threads to reach their exit point.
- ACE_DEBUG ((LM_DEBUG,
- "(%t) waiting with thread manager ...\n"));
-
- ACE_Thread_Manager::instance ()->wait ();
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) done correct!\n"));
- return 0;
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class Worker_Task<ACE_Null_Barrier>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate Worker_Task<ACE_Null_Barrier>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-#else
-int
-main (int, char *[])
-{
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
- return 0;
-}
-#endif /* ACE_HAS_THREADS */