summaryrefslogtreecommitdiff
path: root/docs/tutorials/013/page06.html
blob: 4e2ccf5c255381ce9c86853001ce56f501ce412e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
<HTML>
<HEAD>
   <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
   <META NAME="Author" CONTENT="James CE Johnson">
   <TITLE>ACE Tutorial 013</TITLE>
</HEAD>
<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">

<CENTER><B><FONT SIZE=+2>ACE Tutorial 013</FONT></B></CENTER>

<CENTER><B><FONT SIZE=+2>Multiple thread pools</FONT></B></CENTER>


<P>
<HR WIDTH="100%">
<P>
Let's take a look now at the new Task object.  This will obviously be
different from the Tasks we've created before but I think you'll be
surprised at how relatively simple it actually is.
<P>
Remember that the goal of this tutorial was to use the reference
counting abilities of the ACE_Data_Block.  The only way to show that
effectively is to have a data block passed between different threads.
A thread pool isn't really going to do that so, instead, our new Task
can be part of a chain of tasks.  In that way, each Task can pass the
data on to another and satisfy our need for moving the ACE_Data_Block
around.
If we've done the reference counting correctly then none of our tasks
will be trying to work with deleted data and we won't have any memory
leaks at the end.
<P>
There's not much to the header, so I've included it and the cpp file
on this one page.
<P>
<HR WIDTH="100%">
<HR width=50%><P><center>task.h</center><HR width=50%>
<PRE>

<font color=red>// $Id$</font>

<font color=blue>#ifndef</font> <font color=purple>TASK_H</font>
<font color=blue>#define</font> <font color=purple>TASK_H</font>

<font color=blue>#include</font> "<A HREF="../../../ace/Task.h">ace/Task.h</A>"

<font color=blue>#if !defined</font> (<font color=purple>ACE_LACKS_PRAGMA_ONCE</font>)
<font color=blue># pragma</font> <font color=purple>once</font>
<font color=blue>#endif</font> <font color=red>/* ACE_LACKS_PRAGMA_ONCE */</font>

<font color=blue>#include</font> "<font color=green>mld.h</font>"

<font color=red>/*
   This is much like the Task we've used in the past for implementing a  thread
   pool.  This time, however, I've made the Task an element in a  singly-linked
   list.  As the svc() method finishes the process() on a  unit of work, it
   will enqueue that unit of work to the next_ Task if  there is one.  If the
   Task does not have a next_ Task, it will invoke the unit of work object's
   fini() method after invoking process().
 */</font>
class Task : public ACE_Task &lt; ACE_MT_SYNCH >
{
public:

    typedef ACE_Task &lt; ACE_MT_SYNCH > inherited;

        <font color=red>// Construct ourselves and an optional number of subtasks</font>
        <font color=red>// chained beyond us.</font>
    Task (int sub_tasks = 0);
    ~Task (void);

        <font color=red>/*
          I really wanted this to be called open() but that was already
          claimed by the Task framework.  start() will kick off our thread 
          pool for us.
        */</font>
    int start (int threads = 1);

        <font color=red>// Take Unit_Of_Work objects from the thread pool and invoke</font>
        <font color=red>// their process() and/or fini() as appropriate.</font>
    int svc (void);

        <font color=red>// Shut down the thread pool and it's associated subtasks</font>
    int close (u_long flags = 0);

        <font color=red>// Wait for the pool and subtasks to close</font>
    int wait (void);

protected:
    ACE_Barrier * barrier_;
    Task *next_;
    MLD;
};

<font color=blue>#endif</font>
</PRE>
<HR width=50%><P><center>task.cpp</center><HR width=50%>
<PRE>

<font color=red>// $Id$</font>

<font color=blue>#include</font> "<font color=green>task.h</font>"
<font color=blue>#include</font> "<font color=green>block.h</font>"
<font color=blue>#include</font> "<font color=green>work.h</font>"

<font color=red>/*
   Construct the Task with zero or more subtasks.  If subtasks are requested,
   we assign our next_ pointer to the first of those and let it worry about
   any remaining subtasks. 
 */</font>
<font color=#008888>Task::Task</font> (int sub_tasks)
        : barrier_ (0)
         ,next_ (0)
{
    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) Task ctor 0x%x\n</font>", (void *) this));
    if (sub_tasks)
    {
        next_ = new Task (--sub_tasks);
    }
}

<font color=red>/*
   Delete our barrier object and any subtasks we may have. 
 */</font>
<font color=#008888>Task::~Task</font> (void)
{
    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) Task dtor 0x%x\n</font>", (void *) this));

    delete barrier_;
    delete next_;
}

<font color=red>/*
   Open our thread pool with the requested number of threads.  If subtasks are
   enabled, they inherit the thread-pool size.  Make sure that the subtasks can 
   be opened before we open our own threadpool. 
 */</font>
int <font color=#008888>Task::start</font> (int threads)
{
    if (next_)
    {
        if (next_->start (threads) == -1)
        {
            return -1;
        }
    }

    barrier_ = new ACE_Barrier (threads);
    return this->activate (THR_NEW_LWP, threads);
}

<font color=red>/*
   Close ourselves and any subtasks.  This just prints a message so that we can 
   assure ourselves things are cleaned up correctly. 
 */</font>
int <font color=#008888>Task::close</font> (u_long flags)
{
    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) Task close 0x%x\n</font>", (void *) this));
    if (next_)
    {
        next_->close (flags);
    }

    return (0);
}

<font color=red>/*
   Wait for all of the threads in our pool to exit and then wait for any
   subtasks.  When called from the front of the task chain, this won't return
   until all thread pools in the chain have exited. 
 */</font>
int <font color=#008888>Task::wait</font> (void)
{
    <font color=#008888>inherited::wait</font> ();
    if (next_)
    {
        next_->wait ();
    }
    return (0);
}

<font color=red>/*
   Like the thread-pools before, this is where all of the work is done. 
 */</font>
int <font color=#008888>Task::svc</font> (void)
{
        <font color=red>// Wait for all threads to get this far before continuing.</font>
    this->barrier_->wait ();

    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) Task 0x%x starts in thread %u\n</font>", (void *) this, <font color=#008888>ACE_Thread::self</font> ()));

        <font color=red>// getq() wants an ACE_Message_Block so we'll start out with one</font>
        <font color=red>// of those.  We could do some casting (or even auto-casting) to</font>
        <font color=red>// avoid the extra variable but I prefer to be clear about our actions.</font>
    ACE_Message_Block *message;
  
        <font color=red>// What we really put into the queue was our Message_Block.</font>
        <font color=red>// After we get the message from the queue, we'll cast it to this </font>
        <font color=red>// so that we know how to work on it.</font>
    Message_Block *message_block;
  
        <font color=red>// And, of course, our Message_Block contains our Data_Block</font>
        <font color=red>// instead of the typical ACE_Data_Block</font>
    Data_Block *data_block;
  
      <font color=red>// Even though we put Work objects into the queue, we take them</font>
      <font color=red>// out using the baseclass pointer.  This allows us to create new </font>
      <font color=red>// derivatives without having to change this svc() method.</font>
    Unit_Of_Work *work;

    while (1)
    {
            <font color=red>// Get the ACE_Message_Block</font>
        if (this->getq (message) == -1)
        {
            ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>getq</font>"), -1);
        }

            <font color=red>// "<font color=green>Convert</font>" it to our Message_Block</font>
        message_block = (Message_Block *) message;

            <font color=red>// Get the ACE_Data_Block and "<font color=green>convert</font>" to Data_Block in one step.</font>
        data_block = (Data_Block *) (message_block->data_block ());

            <font color=red>// Get the unit of work from the data block</font>
        work = data_block->data ();

            <font color=red>// Show the object's instance value and "<font color=green>type name</font>"</font>
        work->who_am_i ();
        work->what_am_i ();

            <font color=red>// If there is a hangup we need to tell our pool-peers as</font>
            <font color=red>// well as any subtasks.</font>
        if (message_block->msg_type () == <font color=#008888>ACE_Message_Block::MB_HANGUP</font>)
        {
                <font color=red>// duplicate()ing the message block will increment the</font>
                <font color=red>// reference counts on the data blocks.  This allows us</font>
                <font color=red>// to safely release() the message block.  The rule of</font>
                <font color=red>// thumb is that if you pass a message block to a new</font>
                <font color=red>// owner, duplicate() it.  Then you can release() when</font>
                <font color=red>// you're done and not worry about memory leaks.</font>
            if (this->putq (message_block->duplicate ()) == -1)
            {
                ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>putq</font>"), -1);
            }

                <font color=red>// If we have a subtask, duplicate() the message block</font>
                <font color=red>// again and pass it to that task's queue</font>
            if (next_ && next_->putq (message_block->duplicate ()) == -1)
            {
                ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>putq</font>"), -1);
            }

                <font color=red>// We're now done with our copy of the block, so we can</font>
                <font color=red>// release it.  Our peers/subtasks have their own message </font>
                <font color=red>// block to access the shared data blocks.</font>
            message_block->release ();

            break;
        }

            <font color=red>// If this isn't a hangup/shutdown message then we tell the</font>
            <font color=red>// unit of work to process() for a while.</font>
        work->process ();

        if (next_)
        {
                <font color=red>// If we have subtasks, we pass the block on to them.  Notice</font>
                <font color=red>// that I don't bother to duplicate() the block since I won't </font>
                <font color=red>// release it in this case.  I could have invoked</font>
                <font color=red>// duplicate() in the puq() and then release()</font>
                <font color=red>// afterwards.  Either is acceptable.</font>
            if (next_->putq (message_block) == -1)
                ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>putq</font>"), -1);
        }
        else
        {
                <font color=red>// If we don't have subtasks then invoke fini() to tell</font>
                <font color=red>// the unit of work that we won't be invoking process()</font>
                <font color=red>// any more.  Then release() the block.  This release()</font>
                <font color=red>// would not change if we duplicate()ed in the above conditional</font>
            work->fini ();
            message_block->release ();
        }

            <font color=red>// Pretend that the work takes some time...</font>
        <font color=#008888>ACE_OS::sleep</font> (ACE_Time_Value (0, 250));
    }

    return (0);
}
</PRE>
<HR WIDTH="100%">
<P>
So you see... it wasn't really that much more complicated.  We really
just have to remember to pass to <i>next_</i> when we finish working
on the data.  If your Unit_Of_Work derivative is going to implement a
state machine be sure that you also implement a fini() method
<em>or</em> ensure that your chain of subtasks is large enough for all 
possible states.
<P>
<P><HR WIDTH="100%">
<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] [<A HREF="page07.html">Continue This Tutorial</A>]</CENTER>