summaryrefslogtreecommitdiff
path: root/docs/tutorials/010/task.cpp
blob: 238b188272773aa629532345d71e58ef393445e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// $Id$

#include "task.h"
#include "block.h"

/* Set our housekeeping pointer to NULL and tell the user we exist.  */
Task::Task (size_t n_threads)
  : barrier_ (n_threads),
    n_threads_ (n_threads)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Task ctor 0x%x\n",
              (void *) this));
}

/* Take care of cleanup & tell the user we're going away.  */
Task::~Task (void)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Task dtor 0x%x\n",
              (void *) this));

  /* Get our shutdown notification out of the queue and release it.  */
  ACE_Message_Block *message;

  /* Like the getq() in svc() below, this will block until a message
    arrives.  By blocking, we know that the destruction will be paused
    until the last thread is done with the message block.  */
  this->getq (message);
  message->release ();
}

/* Open the object to do work.  Next, we activate the Task into the
  number of requested threads.  */
int
Task::open (void *unused)
{
  ACE_UNUSED_ARG (unused);

  return this->activate (THR_NEW_LWP,
                         n_threads_);
}

/* Tell the user we're closing and invoke the baseclass' close() to
  take care of things.  */
int
Task::close (u_long flags)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Task close 0x%x\n",
              (void *) this));
  return inherited::close (flags);
}

/* Our svc() method waits for work on the queue and then processes
  that work.  */
int
Task::svc (void)
{
  /* This will cause all of the threads to wait on this line until all
    have invoked this method.  The net result is that no thread in the
    Task will get a shot at the queue until all of the threads are
    active.  There's no real need to do this but it's an easy intro
    into the use of ACE_Barrier.  */
  this->barrier_.wait ();

  ACE_DEBUG ((LM_DEBUG,
              "(%P|%t) Task 0x%x starts in thread %d\n",
              (void *) this,
              ACE_Thread::self ()));

  /* Remember that get() needs a reference to a pointer.  To save
    stack thrashing we'll go ahead and create a pointer outside of the
    almost- infinite loop.  */
  ACE_Message_Block *message;

  for (;;)
    {
      /* Get a message from the queue.  Note that getq() will block
        until a message shows up.  That makes us very
        processor-friendly.  */
      if (this->getq (message) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "%p\n",
                           "getq"),
                          -1);
      /* If we got the shutdown request, we need to go away.  */
      if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
        {
          /* Forward the request to any peer threads.  */
          this->putq (message);

          /* Leave the infinite loop so that the thread exits.  */
          break;
        }

      /* The message queue stores char* data.  We use rd_ptr() to get
        to the beginning of the data.  */
      const char *cp = message->rd_ptr ();

      /* Move the rd_ptr() past the data we read.  This isn't real
        useful here since we won't be reading any more from the block
        but it's a good habit to get into.  */
      message->rd_ptr (ACE_OS::strlen (cp));

      /* Display the block's address and data to the user.  */
      ACE_DEBUG ((LM_DEBUG,
                  "(%P|%t) Block 0x%x contains (%s)\n",
                  (void *) message,
                  cp));

      /* Pretend that it takes a while to process the data.  */
      ACE_OS::sleep (ACE_Time_Value (0, 5000));

      /* Release the message block.  Notice that we never delete a
        message block.  Blocks are reference counted & the release()
        method will take care of the delete when there are no more
        references to the data.  */
      message->release ();
    }

  return 0;
}