summaryrefslogtreecommitdiff
path: root/docs/tutorials/014/page05.html
blob: 3fdb16a0be7a80ceff5f6ecfd90b4b10ebc0ab47 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
<!-- $Id$ -->
<HTML>
<HEAD>
   <META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=iso-8859-1">
   <META NAME="Author" CONTENT="Bob McWhirter">
   <TITLE>ACE Tutorial 014</TITLE>
</HEAD>
<BODY TEXT="#000000" BGCOLOR="#FFFFFF" LINK="#000FFF" VLINK="#FF0F0F">

<CENTER><B><FONT SIZE=+2>ACE Tutorial 014</FONT></B></CENTER>

<CENTER><B><FONT SIZE=+2>ACE_Stream Tutorial, Of Sorts</FONT></B></CENTER>

<P>
<HR WIDTH="100%">
<P>
Now we come to main().  In the previous task-chain tutorial
    every thread pool had to have the same number of threads.  This
    time around, we leverage the construction method of ACE_Stream
    and ACE_Module to customize the thread-pool size in each
    ACE_Task of the stream.
<P>
Remember EndTask from the previous page?  We create one here and push
    it into the stream to take care of cleaning up the messages.
    Technically, we could have replaced the default Tail task
    created by the ACE framework but it seems to make more sense to
    just push our "tail" onto the stream like the other tasks.  The
    caveat to this method is that you must be sure you don't push()
    any other Modules behind the EndTask!
<P>
Once the stream of modules containing tasks is all setup then we can
    put() some data into the stream for processing.  The clever use
    of Task::close() makes shutting downt the stream easier than
    ever.  No messing with hangup messages at the application level,
    just close() when you're done!  What could be simpler?
<P>
<HR WIDTH="100%">
<PRE>

<font color=red>// $Id$</font>

<font color=red>// stream.cxx</font>
<font color=red>//</font>
<font color=red>// Tutorial regarding a way to use ACE_Stream.</font>
<font color=red>//</font>
<font color=red>// written by bob mcwhirter (bob@netwrench.com)</font>
<font color=red>//</font>
<font color=red>//</font>

<font color=blue>#include</font> "<font color=green>Task.h</font>"
<font color=blue>#include</font> "<font color=green>EndTask.h</font>"
<font color=red>// This is our specialized ACE_Task.</font>

<font color=blue>#include</font> &lt;ace/Module.h>
<font color=blue>#include</font> &lt;ace/Stream.h>
<font color=blue>#include</font> &lt;ace/streams.h>
<font color=red>// These are the neccessary ACE headers.</font>


typedef ACE_Module&lt;ACE_MT_SYNCH> Module;
typedef ACE_Stream&lt;ACE_MT_SYNCH> Stream;
<font color=red>// Just to avoid a lot of typing, typedefs</font>
<font color=red>// are generally a good idea.</font>

int main(int argc, char *argv[])
{
  int numberOfMessages = argc > 1 ? <font color=#008888>ACE_OS::atoi</font>(argv[1]) : 3;
  <font color=red>// unless otherwise specified, just send three messages</font>
  <font color=red>// down the stream.</font>

  Stream theStream;
  <font color=red>// the ACE_Stream itself.</font>

  <font color=red>// Now, we instantiate 4 different Tasks.  These do not</font>
  <font color=red>// need to be all the same class, but they do need to</font>
  <font color=red>// all derrive from the same flavor of ACE_Task.</font>
  <font color=red>//</font>
  <font color=red>// Also, we instantiate a fifth end-cap Task to clean</font>
  <font color=red>// up Message_Blocks as they reach the end.</font>

  Task *taskOne;
  Task *taskTwo;
  Task *taskThree;
  Task *taskFour;
  Task *taskEnd;

  <font color=red>// Out Task's take two arguments: a name, and the number</font>
  <font color=red>// of threads to dedicate to the task.</font>

  taskOne = new Task("<font color=green>Task No. 1</font>", 1);
  taskTwo = new Task("<font color=green>Task No. 2</font>", 3);
  taskThree = new Task("<font color=green>Task No. 3</font>", 7);
  taskFour = new Task("<font color=green>Task No. 4</font>", 1);

  <font color=red>// Our EndTask only takes 1 argument, as it actually</font>
  <font color=red>// doesn't spawn any threads for processing.</font>

  taskEnd = new EndTask("<font color=green>End Task</font>");

  Module *moduleOne;
  Module *moduleTwo;
  Module *moduleThree;
  Module *moduleFour;
  Module *moduleEnd;

  <font color=red>// ACE_Stream accepts ACE_Modules, which are simply a pair of</font>
  <font color=red>// ACE_Tasks.  One is dedicated for writing, while the other</font>
  <font color=red>// is dedicated to reading.  Think of the writing side as</font>
  <font color=red>// downstream, and the reading side as upstream.</font>
  <font color=red>//</font>
  <font color=red>// We're only working with a unidirection Stream today,</font>
  <font color=red>// so we'll only actually install a Task into the write</font>
  <font color=red>// side of the module, effectively downstream.</font>

  moduleOne = new Module("<font color=green>Module No. 1</font>", taskOne);
  moduleTwo = new Module("<font color=green>Module No. 2</font>", taskTwo);
  moduleThree = new Module("<font color=green>Module No. 3</font>", taskThree);
  moduleFour = new Module("<font color=green>Module No. 4</font>", taskFour);
  moduleEnd = new Module("<font color=green>Module End</font>", taskEnd);

  <font color=red>// Now we push the Modules onto the Stream.</font>
  <font color=red>// Pushing adds the module to the head, or</font>
  <font color=red>// otherwise prepends it to whatever modules</font>
  <font color=red>// are already installed.</font>

  <font color=red>// So, you need to push() the modules on -backwards-</font>
  <font color=red>// from our viewpoint.</font>

  if (theStream.push(moduleEnd) == -1) {
           ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1);
  }

  if (theStream.push(moduleFour) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1);
  }

  <font color=red>// As we push a Module onto the Stream, it gets opened.</font>
  <font color=red>// When a Module open()s, it opens the Tasks that it contains.</font>
  <font color=red>//</font>
  <font color=red>// Since we cannot provide an argument to this embedded</font>
  <font color=red>// call to open(), we supplied specified the number of</font>
  <font color=red>// threads in the constructor of our Tasks.</font>

  if (theStream.push(moduleThree) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1);
  }

  if (theStream.push(moduleTwo) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1);
  }

  if (theStream.push(moduleOne) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>push</font>"), -1);
  }

  <font color=red>// Now that the Modules are open, the Tasks threads should</font>
  <font color=red>// be launching and entering their svc() loop, so we send</font>
  <font color=red>// some messages down the Stream.</font>

  int sent = 1;

  ACE_Message_Block *message;

  while (sent &lt;= numberOfMessages) {

    <font color=red>// First, create ourselves a Message_Block.</font>
    <font color=red>// see Tutorials 10-13 for more information</font>
    <font color=red>// about Message_Blocks and Message_Queues.</font>

    message = new ACE_Message_Block(128);

    <font color=red>// Now, we grab the write-pointer from the Block,</font>
    <font color=red>// and sprintf() our text into it.</font>

    <font color=#008888>ACE_OS::sprintf</font>(message->wr_ptr(), "<font color=green>Message No. %d</font>", sent);

    <font color=red>// All we have to do now is drop the Message_Block</font>
    <font color=red>// into the Stream.</font>

    <font color=red>// It is always a good idea to duplicate() a Message_Block</font>
    <font color=red>// when you put it into any Message_Queue, as then</font>
    <font color=red>// you can always be allowed to release() your copy</font>
    <font color=red>// without worry.</font>

    if (theStream.put(message->duplicate(), 0) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "<font color=green>%p\n</font>", "<font color=green>put</font>"), -1);
    }

    message->release();
    ++sent;
  }

  <font color=red>// Now that we've sent our Message_Blocks, close down</font>
  <font color=red>// the Stream.</font>
  <font color=red>//</font>
  <font color=red>// The Stream will automagically delete the Modules and</font>
  <font color=red>// the contained Tasks.  We don't have to do that.</font>
  <font color=red>//</font>
  <font color=red>// This call will block (due to the way we've written our</font>
  <font color=red>// Task class) until all Message_Blocks have cleared the</font>
  <font color=red>// entire Stream, and all associated threads have exited.</font>

  theStream.close();

  return 0;
}
</PRE>
<P><HR WIDTH="100%">
<CENTER>[<A HREF="../online-tutorials.html">Tutorial Index</A>] </CENTER>