summaryrefslogtreecommitdiff
path: root/docs/tutorials/010/task.cpp
blob: 84f70cae0ee1ea879cd7b31e71adf156d342c836 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134

// $Id$

#include "task.h"
#include "block.h"

/*
  Set our  housekeeping pointer to NULL and tell the user we exist.
 */
Task::Task (void)
: barrier_ (0)
{
  ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task ctor 0x%x\n", (void *) this));
}

/*
  Take care of cleanup & tell the user we're going away.
*/
Task::~Task (void)
{
  ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task dtor 0x%x\n", (void *) this));

  /*
    Get our shutdown notification out of the queue and release it.
  */
  ACE_Message_Block * message;
  this->getq(message);
  message->release();

  delete barrier_;
}

/*
  Open the object to do work.  We create the Barrier object and tell
  it how many threads we'll be using.  Next, we activate the Task
  into the number of requested threads.
*/
int Task::open (int threads)
{
  barrier_ = new ACE_Barrier (threads);
  return this->activate (THR_NEW_LWP, threads);
}

/*
  Tell the user we're closing and invoke the baseclass' close() to
  take care of things.
*/
int Task::close (u_long flags)
{
  ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task close 0x%x\n", (void *) this));
  return inherited::close (flags);
}

/*
  Our svc() method waits for work on the queue and then processes that work.
 */
int Task::svc (void)
{
  /*
    This will cause all of the threads to wait on this line until all
    have invoked this method.  The net result is that no thread in the
    Task will get a shot at the queue until all of the threads are active.
    There's no real need to do this but it's an easy intro into the use
    of ACE_Barrier.
   */
  this->barrier_->wait ();

  ACE_DEBUG ((LM_DEBUG, "(%P|%t) Task 0x%x starts in thread %d\n", (void *) this, ACE_Thread::self ()));

  /*
    Remember that get() needs a reference to a pointer.  To save stack
    thrashing we'll go ahead and create a pointer outside of the almost-
    infinite loop.
   */
  ACE_Message_Block *message;
  while (1)
  {
    /*
      Get a message from the queue.
    */
    if (this->getq (message) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "getq"), -1);
    }

    /*
      If we got the shutdown request, we need to go away.
    */
    if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
    {
      /*
        Forward the request to any peer threads.
       */
      this->putq (message);

      /*
        Leave the infinite loop so that the thread exits.
      */
      break;
    }

    /*
      The message queue stores char* data.  We use rd_ptr() to get to
      the beginning of the data.
    */
    const char *cp = message->rd_ptr ();

    /*
      Move the rd_ptr() past the data we read.  This isn't real useful
      here since we won't be reading any more from the block but it's
      a good habit to get into.
    */
    message->rd_ptr( strlen(cp) );
    
    /*
      Display the block's address and data to the user.
    */
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) Block 0x%x contains (%s)\n", (void *) message, cp));

    /*
       Pretend that it takes a while to process the data.
     */
    ACE_OS::sleep (ACE_Time_Value (0, 5000));

    /*
      Release the message block.  Notice that we never delete a message block.
      Blocks are reference counted & the release() method will take care of
      the delete when there are no more references to the data.
    */
    message->release ();
  }

  return (0);
}