summaryrefslogtreecommitdiff
path: root/docs/tutorials/014/page03.html
blob: 114179dd170e0a075a55aa078d4c00929c37bd19 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
<HTML>
<HEAD>
   <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
   <META NAME="Author" CONTENT="Bob McWhirter">
   <TITLE>ACE Tutorial 014</TITLE>
</HEAD>
<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">

<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>

<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>

<P>
<HR WIDTH="100%">
<P>
Before we get to main() let's take a look at the Task implementation.
	  While we've overridden several methods, the real work is done in 
	  the close() and svc() methods.
<P>
Notice how close() figures out if it is being called by the shutdown
	  of the ACE_Stream or by the exit of svc().  The magic here is
	  provided by the <i>flags</i> parameter.  By handling the stream
	  shutdown in this way, we don't have to do anything strange in
	  svc().  We also don't end up with extra hangup messages in the
	  queue when the dust all settles down.
<P>
Like our other tutorials, svc() looks for a hangup and processes data.
<P>
<HR WIDTH="100%">
<PRE>

<font color=red>// $Id$</font>

<font color=red>// Task.cxx</font>
<font color=red>//</font>
<font color=red>// Tutorial regarding a way to use ACE_Stream.</font>
<font color=red>//</font>
<font color=red>// written by bob mcwhirter (bob@netwrench.com)</font>
<font color=red>//</font>
<font color=red>//</font>

<font color=blue>#include</font> &lt;ace/Message_Block.h>

<font color=blue>#include</font> "<font color=green>Task.h</font>"

<font color=#008888>Task::Task</font>(const char * nameOfTask,
	   int numberOfThreads)
  : d_numberOfThreads(numberOfThreads),
    d_barrier(numberOfThreads)
{
  <font color=red>// Just initialize our name, number of threads, and barrier.</font>

  <font color=#008888>ACE_OS::strcpy</font>(d_nameOfTask, nameOfTask);
}

<font color=#008888>Task::~Task</font>(void)
{
  ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::~Task</font>() -- once per Task\n</font>", d_nameOfTask));
}

int <font color=#008888>Task::open</font>(void *arg) 
{
  ACE_UNUSED_ARG(arg);

  ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::open</font>() -- once per Task\n</font>", d_nameOfTask));
  
  <font color=red>// call <font color=#008888>ACE_Task::activate</font>() to spawn the threads using</font>
  <font color=red>// our <font color=#008888>Task::svc</font>() as the function to be run.</font>

  <font color=red>// FMM -- Frequently Made Mistake --</font>
  <font color=red>//  </font>
  <font color=red>// If you specify the flag THR_DETACHED when activating the</font>
  <font color=red>// Task, you will get an assert() violation during close(),</font>
  <font color=red>// since the Task waits for all of its threads to rejoin.</font>
  <font color=red>// </font>

  return this->activate(THR_NEW_LWP, d_numberOfThreads);
}

int <font color=#008888>Task::put</font>(ACE_Message_Block *message,
	      ACE_Time_Value *timeout)
{
  <font color=red>// ACE_Stream uses the put() method of Tasks to send messages.</font>
  <font color=red>// This defaultly does nothing.  Here we link our put() method</font>
  <font color=red>// directly to our putq() method, so that Messages put() to us</font>
  <font color=red>// will appear in the Message_Queue that is checked by the</font>
  <font color=red>// service threads.</font>
  
  return this->putq(message, timeout);
}

int <font color=#008888>Task::close</font>(u_long flags)
{

  <font color=red>// When the Stream closes the Module, the Module then close()'s the Task</font>
  <font color=red>// and passing a value of (1) as the flag.</font>

  <font color=red>// When a service thread exits, it calls close() with a value that is not</font>
  <font color=red>// (1).</font>

  <font color=red>// We use this fact to tell the difference between closing a service thread,</font>
  <font color=red>// and closing the main Task itself.</font>

  if (flags == 1) {

    <font color=red>// The Module has asked to close the main Task.</font>

    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::close</font>() -- flags == 1 -- once per Task\n</font>", d_nameOfTask));

    <font color=red>// We create a Message_Block...</font>

    ACE_Message_Block *hangupBlock = new ACE_Message_Block();

    <font color=red>// And make it of the type MB_HANGUP.  </font>

    hangupBlock->msg_type(<font color=#008888>ACE_Message_Block::MB_HANGUP</font>);

    <font color=red>// We then send this Block into the Message_Queue to be seen by the </font>
    <font color=red>// service threads.</font>

    <font color=red>// Once again we duplicate() the Block as send it off...</font>
    
    if (this->putq(hangupBlock->duplicate()) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::close</font>() putq</font>"), -1);
    }
    
    <font color=red>// ..and we're free to release() our copy of it.</font>

    hangupBlock->release();

    <font color=red>// Now, all we have to do is wait() for the service threads to all </font>
    <font color=red>// exit.  This is where using THR_DETACHED in the activate() method</font>
    <font color=red>// will come back to haunt you.</font>

    <font color=red>// The Stream waits until this returns before attempting to remove</font>
    <font color=red>// the next Module/Task group in the Stream.  This allows for an</font>
    <font color=red>// orderly shutting down of the Stream.</font>

    return this->wait();


  } else {

    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::close</font>() -- flags != 1 -- once per servicing thread\n</font>", d_nameOfTask));

    <font color=red>// This is where we can clean up any mess left over by each service thread.</font>
    <font color=red>// In this Task, there is nothing to do.</font>

  }

  return 0;

}

int <font color=#008888>Task::svc</font>(void)
{

  <font color=red>// This is the function that our service threads run once they are spawned.</font>

  ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- once per servicing thread\n</font>", d_nameOfTask));

  <font color=red>// First, we wait until all of our peer service threads have arrived</font>
  <font color=red>// at this point also.</font>

  d_barrier.wait();

  ACE_Message_Block *messageBlock;

  while (1) {

    <font color=red>// And now we loop almost infinitely.</font>

    <font color=red>// getq() will block until a Message_Block is available to be read,</font>
    <font color=red>// or an error occurs.</font>

    if ( this->getq(messageBlock, 0) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::svc</font>() getq</font>"), -1);
    }

    if (messageBlock->msg_type() == <font color=#008888>ACE_Message_Block::MB_HANGUP</font>) {
      
      <font color=red>// If the Message_Block is of type MB_HANGUP, then we're being asked</font>
      <font color=red>// to shut down nicely.</font>

      ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- HANGUP block received\n</font>", d_nameOfTask));

      <font color=red>// So, we duplicate the Block, and put it back into the Message_Queue,</font>
      <font color=red>// in case there are some more peer service threads still running.</font>

      if (this->putq(messageBlock->duplicate()) == -1) {
	ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::svc</font>() putq</font>"), -1);
      }

      <font color=red>// We release our copy of the Block.</font>
      messageBlock->release();

      <font color=red>// And we break out of the nearly infinitely loop, and</font>
      <font color=red>// head towards close() ourselves.</font>
      break;
    }

    <font color=red>// If we're here, then we've received a Message_Block that was </font>
    <font color=red>// not informing us to quit, so we're assuming it's a valid</font>
    <font color=red>// meaningful Block.</font>

    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- Normal block received\n</font>", d_nameOfTask));

    <font color=red>// We grab the read-pointer from the Block, and display it through a DEBUG statement.</font>

    ACE_DEBUG ((LM_DEBUG, "<font color=green>(%P|%t) %s <font color=#008888>Task::svc</font>() -- %s\n</font>", d_nameOfTask, messageBlock->rd_ptr() ));

    <font color=red>// We pretend that this takes to time to process the Block.</font>
    <font color=red>// If you're on a fast machine, you might have to raise this</font>
    <font color=red>// value to actually witness different threads handling</font>
    <font color=red>// blocks for each Task.</font>

    <font color=#008888>ACE_OS::sleep</font> (ACE_Time_Value (0, 250));

    <font color=red>// Since we're part of a Stream, we duplicate the Block, and </font>
    <font color=red>// send it on to the next Task.</font>

    if (put_next(messageBlock->duplicate()) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green><font color=#008888>Task::svc</font>() put_next</font>"), -1);
    }
    
    <font color=red>// And then we release our copy of it.</font>

    messageBlock->release();

  }

  return 0;

}


const char * <font color=#008888>Task::nameOfTask</font>(void) const
{
  return d_nameOfTask;
}
</PRE>
<P><HR WIDTH="100%">
<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] [<A HREF="page04.html">Continue This Tutorial</A>]</CENTER>