summaryrefslogtreecommitdiff
path: root/docs/tutorials/018/page04.html
blob: b83ab46f6534ee2cd006723c6cedaa51a73d6027 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
<HTML>
<HEAD>
   <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
   <META NAME="Author" CONTENT="James CE Johnson">
   <TITLE>ACE Tutorial 018</TITLE>
</HEAD>
<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">

<CENTER><B><FONT SIZE=+2>ACE Tutorial 018</FONT></B></CENTER>

<CENTER><B><FONT SIZE=+2>The FIFO Nature of ACE_Token</FONT></B></CENTER>

<P>
<HR WIDTH="100%">
Our Test implementation looks much like the other ACE_Task derivatives 
we've used in the past.  The most obvious change is the addition of
the run() method.  run() will activate the threads and put a few
messages into the queue.  We could have done that in main() but it
just makes more sense here.
<P>
Notice how svc() grabs the guard after getting a message from the
queue.  Since we constructed our Task baseclass with ACE_MT_SYNCH, we
know that the queue is already thread-safe.  Our purpose in grabbing
the additional lock is to show how ACE_Token and ACE_Mutex behave
differently.  In a real app, you'd be doing this to protect shared
resources that the threads might clobber.
<HR>
<PRE>
<font color=red>// $Id$</font>

<font color=red>/* This is something new... Since we're included by the header, we
   have to provide a sentry to protect against recursive inclusion.
 */</font>
<font color=blue>#ifndef</font> <font color=purple>TEST_T_C</font>
<font color=blue>#define</font> <font color=purple>TEST_T_C</font>

<font color=red>// Get our definition</font>
<font color=blue>#include</font> "<font color=green>Test_T.h</font>"

<font color=red>// We'll hard-code the thread count.  Mucking around with that isn't</font>
<font color=red>// really the point of the exercise today...</font>
<font color=blue>#define</font> <font color=purple>TEST_THREAD_COUNT</font> 5

<font color=red>/* Construction time...
   Initialize the baseclass, the name and the barrier.  Since the
   client will probably invoke run() next, we go ahead an announce our 
   creation to make the output more readable.
 */</font>
template &lt;class MUTEX>
Test_T&lt;MUTEX>::Test_T (const char *name)
  : ACE_Task&lt;ACE_MT_SYNCH>(),
    name_ (name),
    barrier_ (TEST_THREAD_COUNT)
{
  ACE_DEBUG ((LM_INFO,
              "<font color=green>(%P|%t|%T)\tTest_T (%s) created\n</font>",
              name));
}

<font color=red>/* Activate the threads and create some test data...
 */</font>
template &lt;class MUTEX> int 
Test_T&lt;MUTEX>::run (void)
{
  <font color=red>// Try to activate the set of threads that will test the mutex</font>
  if (this->open () == -1)
    return -1;

  <font color=red>// Create a set of messages.  I chose twice the thread count so that</font>
  <font color=red>// we can see how they get distributed.</font>
  for (int i = 0; i &lt; TEST_THREAD_COUNT*2; ++i)
    {
      <font color=red>// A message block big enough for a simple message.</font>
      ACE_Message_Block *message;

      ACE_NEW_RETURN (message,
                      ACE_Message_Block (64),
                      -1);

      <font color=red>// Put some text into the message block so that we can know</font>
      <font color=red>// what's going on when we get to svc()</font>
      sprintf (message->wr_ptr (),
               "<font color=green>Message Number %d</font>",
               i);
      message->wr_ptr (<font color=#008888>ACE_OS::strlen</font> (message->rd_ptr ()) + 1);

      <font color=red>// Send the message to the thread pool</font>
      if (this->send (message) == -1)
        break;
    }

  <font color=red>// Send a hangup to the thread pool so that we can exit.</font>
  if (this->send () == -1)
    return -1;

  <font color=red>// Wait for all of the threads to exit and then return to the client.</font>
  return this->wait ();
}

<font color=red>/* Send a message to the thread pool
 */</font>
template &lt;class MUTEX> int 
Test_T&lt;MUTEX>::send (ACE_Message_Block *message)
{
  <font color=red>// If no message was provided, create a hangup message.</font>
  if (message == 0)
    ACE_NEW_RETURN (message,
                    ACE_Message_Block (0,
                                       <font color=#008888>ACE_Message_Block::MB_HANGUP</font>),
                    -1);

  <font color=red>// Use the duplicate() method when sending the message.  For this</font>
  <font color=red>// simple application, that may be overkill but it's a good habit.</font>
  <font color=red>// duplicate() will increment the reference count so that each user</font>
  <font color=red>// of the message can release() it when done.  The last user to call</font>
  <font color=red>// release() will cause the data to be deleted.</font>
  if (this->putq (message->duplicate ()) == -1)
    {
      <font color=red>// Error?  release() the message block and return failure.</font>
      message->release ();
      return -1;
    }

  <font color=red>// release() the data to prevent memory leaks.</font>
  message->release();

  return 0;
}

<font color=red>/* A farily typical open().  Just activate the set of threads and return.
 */</font>
template &lt;class MUTEX> int 
Test_T&lt;MUTEX>::open (void *arg)
{
  ACE_UNUSED_ARG(arg);
  return this->activate (THR_NEW_LWP,
                         TEST_THREAD_COUNT);
}

<font color=red>/* svc() is also fairly typical.  The new part is the use of the guard 
   to simulate protection of shared resources.
 */</font>
template &lt;class MUTEX> int 
Test_T&lt;MUTEX>::svc (void)
{
  <font color=red>// Keep a simple thread identifier.  We could always use the</font>
  <font color=red>// thread id but this is a nice, simple number.</font>
  int my_number = ++thread_num_;

  ACE_DEBUG ((LM_INFO,
              "<font color=green>%d (%P|%t|%T)\<font color=#008888>tTest_T::svc</font>() Entry\n</font>",
              my_number));

  <font color=red>// Wait for all of threads to get started so that they all have a</font>
  <font color=red>// fair shot at the message queue.  Comment this out and see how the</font>
  <font color=red>// behaviour changes.  Does it surprise you?</font>
  barrier_.wait ();

  ACE_Message_Block *message;
  int mcount = 0;

  <font color=red>// This would usually be an almost-infinite loop.  Instead, I've</font>
  <font color=red>// governed it so that no single thread can get more than "thread</font>
  <font color=red>// count" number of messages.  You'll see that with ACE_Mutex, this</font>
  <font color=red>// is just about the only way to keep the first thread from getting</font>
  <font color=red>// all the action.  Ths is obviously just for sake of the test since</font>
  <font color=red>// you don't want your real-world app to exit after a fixed number</font>
  <font color=red>// of messages!</font>
  while (mcount &lt; TEST_THREAD_COUNT)
    {
      <font color=red>// Get a message.  Since the message queue is already</font>
      <font color=red>// thread-safe we don't have to guard it.  In fact, moving the</font>
      <font color=red>// guard up above getq() will decrease your parallelization.</font>
      if (getq (message) == -1)
        break;

      <font color=red>// Now we pretend that there are shared resources required to</font>
      <font color=red>// process the data.  We grab the mutex through the guard and</font>
      <font color=red>// "<font color=green>do work</font>".  In a real application, you'll want to keep these</font>
      <font color=red>// critical sections as small as possible since they will reduce</font>
      <font color=red>// the usefulness of multi-threading.</font>
      guard_t guard (mutex_);

      <font color=red>// Increase our message count for the debug output and the</font>
      <font color=red>// governor.</font>
      ++mcount;

      <font color=red>// Check for a hangup request...  Notice the use of release()</font>
      <font color=red>// again to prevent leaks</font>
      if (message->msg_type () == <font color=#008888>ACE_Message_Block::MB_HANGUP</font>)
        {
          message->release ();
          break;
        }

      <font color=red>// Display the message so that we can see if things are working</font>
      <font color=red>// the way we want.</font>
      ACE_DEBUG ((LM_INFO,
                  "<font color=green>%d (%P|%t|%T)\<font color=#008888>tTest_T::svc</font>() received message #%d (%s)\n</font>",
                  my_number,
                  mcount,
                  message->rd_ptr ()));

      <font color=red>// Pretend that the work takes some time to complete.  Remember,</font>
      <font color=red>// we're holding that lock during this time!</font>
      <font color=#008888>ACE_OS::sleep</font> (1);

      <font color=red>// No leaks...</font>
      message->release ();
    }

  <font color=red>// Send a hangup to the other threads in the pool.  If we don't do</font>
  <font color=red>// this then wait() will never exit since all of the other threads</font>
  <font color=red>// are still blocked on getq().</font>
  this->send ();
    
  return 0;
};

<font color=blue>#endif</font> <font color=red>/* TEST_T_C */</font>
</PRE>
<P><HR WIDTH="100%">
<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] [<A HREF="page05.html">Continue This Tutorial</A>]</CENTER>