summaryrefslogtreecommitdiff
path: root/docs/tutorials/018/page04.html
blob: dcf0ac3ca2bcf71fbea5e3c0f8cab06dbaf00db6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
<HTML>
<HEAD>
   <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
   <META NAME="Author" CONTENT="James CE Johnson">
   <TITLE>ACE Tutorial 018</TITLE>
</HEAD>
<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">

<CENTER><B><FONT SIZE=+2>ACE Tutorial 018</FONT></B></CENTER>

<CENTER><B><FONT SIZE=+2>The FIFO Nature of ACE_Token</FONT></B></CENTER>

<P>
<HR WIDTH="100%">
Our Test implementation looks much like the other ACE_Task derivatives 
we've used in the past.  The most obvious change is the addition of
the run() method.  run() will activate the threads and put a few
messages into the queue.  We could have done that in main() but it
just makes more sense here.
<P>
Notice how svc() grabs the guard after getting a message from the
queue.  Since we constructed our Task baseclass with ACE_MT_SYNCH, we
know that the queue is already thread-safe.  Our purpose in grabbing
the additional lock is to show how ACE_Token and ACE_Mutex behave
differently.  In a real app, you'd be doing this to protect shared
resources that the threads might clobber.
<HR>
<PRE>

<font color=red>// $Id$</font>

<font color=red>/* This is something new... Since we're included by the header, we
   have to provide a sentry to protect against recursive inclusion.
 */</font>
<font color=blue>#ifndef</font> <font color=purple>TEST_T_C</font>
<font color=blue>#define</font> <font color=purple>TEST_T_C</font>

<font color=red>// Get our definition</font>
<font color=blue>#include</font> "<font color=green>Test_T.h</font>"

<font color=red>// We'll hard-code the thread count.  Mucking around with that isn't</font>
<font color=red>// really the point of the exercise today...</font>
<font color=blue>#define</font> <font color=purple>TEST_THREAD_COUNT</font> 5

<font color=red>/* Construction time...
   Initialize the baseclass, the name and the barrier.  Since the
   client will probably invoke run() next, we go ahead an announce our 
   creation to make the output more readable.
 */</font>
template &lt;class MUTEX>
Test_T&lt;MUTEX>::Test_T( const char * _name )
        : ACE_Task&lt;ACE_MT_SYNCH>()
         ,name_(_name)
         ,barrier_(TEST_THREAD_COUNT)
{
    ACE_DEBUG ((LM_INFO, "<font color=green>(%P|%t|%T)\tTest_T (%s) created\n</font>", _name ));
}

<font color=red>/* Activate the threads and create some test data...
 */</font>
template &lt;class MUTEX>
int Test_T&lt;MUTEX>::run(void)
{
        <font color=red>// Try to activate the set of threads that will test the mutex</font>
    if( this->open() == -1 )
    {
        return -1;
    }

        <font color=red>// Create a set of messages.  I chose twice the thread count</font>
        <font color=red>// so that we can see how they get distributed.</font>
    for( int i = 0 ; i &lt; TEST_THREAD_COUNT*2 ; ++i )
    {
            <font color=red>// A message block big enough for a simple message.</font>
        ACE_Message_Block * message = new ACE_Message_Block(64);

            <font color=red>// Put some text into the message block so that we can</font>
            <font color=red>// know what's going on when we get to svc()</font>
        sprintf(message->wr_ptr(),"<font color=green>Message Number %d</font>",i);
        message->wr_ptr( strlen(message->rd_ptr())+1 );

            <font color=red>// Send the message to the thread pool</font>
        if( this->send(message) == -1 )
        {
            break;
        }
    }

        <font color=red>// Send a hangup to the thread pool so that we can exit.</font>
    if( this->send() == -1 )
    {
        return -1;
    }

        <font color=red>// Wait for all of the threads to exit and then return to the client.</font>
    return this->wait();
}

<font color=red>/* Send a message to the thread pool
 */</font>
template &lt;class MUTEX>
int Test_T&lt;MUTEX>::send( ACE_Message_Block * _message )
{
        <font color=red>// If no message was provided, create a hangup message.</font>
    if( ! _message )
    {
        _message = new
            ACE_Message_Block(0,<font color=#008888>ACE_Message_Block::MB_HANGUP</font>);
    }

        <font color=red>// Use the duplicate() method when sending the message.  For</font>
        <font color=red>// this simple application, that may be overkill but it's a</font>
        <font color=red>// good habit.  duplicate() will increment the reference count </font>
        <font color=red>// so that each user of the message can release() it when</font>
        <font color=red>// done.  The last user to call release() will cause the data</font>
        <font color=red>// to be deleted.</font>
    if( this->putq(_message->duplicate()) == -1 )
    {
            <font color=red>// Error?  release() the message block and return failure.</font>
        _message->release();
        return -1;
    }

        <font color=red>// release() the data to prevent memory leaks.</font>
    _message->release();

    return 0;
}

<font color=red>/* A farily typical open().  Just activate the set of threads and return.
 */</font>
template &lt;class MUTEX>
int Test_T&lt;MUTEX>::open( void * _arg )
{
    ACE_UNUSED_ARG(_arg);
    return this->activate(THR_NEW_LWP, TEST_THREAD_COUNT);
}

<font color=red>/* svc() is also fairly typical.  The new part is the use of the guard 
   to simulate protection of shared resources.
 */</font>
template &lt;class MUTEX>
int Test_T&lt;MUTEX>::svc(void)
{
        <font color=red>// Keep a simple thread identifier.  We could always use the</font>
        <font color=red>// thread id but this is a nice, simple number.</font>
    int my_number = ++thread_num_;

    ACE_DEBUG ((LM_INFO, "<font color=green>%d (%P|%t|%T)\<font color=#008888>tTest_T::svc</font>() Entry\n</font>",
                my_number));

        <font color=red>// Wait for all of threads to get started so that they all</font>
        <font color=red>// have a fair shot at the message queue.  Comment this out</font>
        <font color=red>// and see how the behaviour changes.  Does it surprise you?</font>
    barrier_.wait();

    ACE_Message_Block * message;
    int mcount = 0;

        <font color=red>// This would usually be an almost-infinite loop.  Instead,</font>
        <font color=red>// I've governed it so that no single thread can get more than </font>
        <font color=red>// "<font color=green>thread count</font>" number of messages.  You'll see that with</font>
        <font color=red>// ACE_Mutex, this is just about the only way to keep the</font>
        <font color=red>// first thread from getting all the action.  Ths is obviously </font>
        <font color=red>// just for sake of the test since you don't want your</font>
        <font color=red>// real-world app to exit after a fixed number of messages!</font>
    while( mcount &lt; TEST_THREAD_COUNT )
    {
            <font color=red>// Get a message.  Since the message queue is already</font>
            <font color=red>// thread-safe we don't have to guard it.  In fact, moving </font>
            <font color=red>// the guard up above getq() will decrease your</font>
            <font color=red>// parallelization.</font>
        if( getq(message) == -1 )
        {
            break;
        }

            <font color=red>// Now we pretend that there are shared resources required </font>
            <font color=red>// to process the data.  We grab the mutex through the</font>
            <font color=red>// guard and "<font color=green>do work</font>".  In a real application, you'll</font>
            <font color=red>// want to keep these critical sections as small as</font>
            <font color=red>// possible since they will reduce the usefulness of</font>
            <font color=red>// multi-threading.</font>
        guard_t guard(mutex_);

            <font color=red>// Increase our message count for the debug output and the</font>
            <font color=red>// governor.</font>
        ++mcount;

            <font color=red>// Check for a hangup request...</font>
            <font color=red>// Notice the use of release() again to prevent leaks</font>
        if( message->msg_type() == <font color=#008888>ACE_Message_Block::MB_HANGUP</font> )
        {
            message->release();
            break;
        }

            <font color=red>// Display the message so that we can see if things are</font>
            <font color=red>// working the way we want.</font>
        ACE_DEBUG ((LM_INFO, "<font color=green>%d (%P|%t|%T)\<font color=#008888>tTest_T::svc</font>() received message #%d (%s)\n</font>",
                    my_number,mcount,message->rd_ptr()));

            <font color=red>// Pretend that the work takes some time to complete.</font>
            <font color=red>// Remember, we're holding that lock during this time!</font>
        <font color=#008888>ACE_OS::sleep</font>(1);

            <font color=red>// No leaks...</font>
        message->release();
    }

        <font color=red>// Send a hangup to the other threads in the pool.  If we don't</font>
        <font color=red>// do this then wait() will never exit since all of the other</font>
        <font color=red>// threads are still blocked on getq().</font>
    this->send();
    
    return(0);
};

<font color=blue>#endif</font> <font color=red>// TEST_T_C</font>
</PRE>
<P><HR WIDTH="100%">
<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] [<A HREF="page05.html">Continue This Tutorial</A>]</CENTER>