summaryrefslogtreecommitdiff
path: root/docs/tutorials/014/stream.cpp
blob: 0e2b784b2f87c3433e3ae5675eb783222329e192 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168

// $Id$

// stream.cxx
//
// Tutorial regarding a way to use ACE_Stream.
//
// written by bob mcwhirter (bob@netwrench.com)
//
//

#include "Task.h"
#include "EndTask.h"
// This is our specialized ACE_Task.

#include <ace/Module.h>
#include <ace/Stream.h>
#include <ace/streams.h>
// These are the neccessary ACE headers.


typedef ACE_Module<ACE_MT_SYNCH> Module;
typedef ACE_Stream<ACE_MT_SYNCH> Stream;
// Just to avoid a lot of typing, typedefs
// are generally a good idea.

int main(int argc, char *argv[])
{
  int numberOfMessages = argc > 1 ? ACE_OS::atoi(argv[1]) : 3;
  // unless otherwise specified, just send three messages
  // down the stream.

  Stream theStream;
  // the ACE_Stream itself.

  // Now, we instantiate 4 different Tasks.  These do not
  // need to be all the same class, but they do need to
  // all derrive from the same flavor of ACE_Task.
  //
  // Also, we instantiate a fifth end-cap Task to clean
  // up Message_Blocks as they reach the end.

  Task *taskOne;
  Task *taskTwo;
  Task *taskThree;
  Task *taskFour;
  Task *taskEnd;

  // Out Task's take two arguments: a name, and the number
  // of threads to dedicate to the task.

  taskOne = new Task("Task No. 1", 1);
  taskTwo = new Task("Task No. 2", 3);
  taskThree = new Task("Task No. 3", 7);
  taskFour = new Task("Task No. 4", 1);

  // Our EndTask only takes 1 argument, as it actually
  // doesn't spawn any threads for processing.

  taskEnd = new EndTask("End Task");

  Module *moduleOne;
  Module *moduleTwo;
  Module *moduleThree;
  Module *moduleFour;
  Module *moduleEnd;

  // ACE_Stream accepts ACE_Modules, which are simply a pair of
  // ACE_Tasks.  One is dedicated for writing, while the other
  // is dedicated to reading.  Think of the writing side as
  // downstream, and the reading side as upstream.
  //
  // We're only working with a unidirection Stream today,
  // so we'll only actually install a Task into the write
  // side of the module, effectively downstream.

  moduleOne = new Module("Module No. 1", taskOne);
  moduleTwo = new Module("Module No. 2", taskTwo);
  moduleThree = new Module("Module No. 3", taskThree);
  moduleFour = new Module("Module No. 4", taskFour);
  moduleEnd = new Module("Module End", taskEnd);

  // Now we push the Modules onto the Stream.
  // Pushing adds the module to the head, or
  // otherwise prepends it to whatever modules
  // are already installed.

  // So, you need to push() the modules on -backwards-
  // from our viewpoint.

  if (theStream.push(moduleEnd) == -1) {
           ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push(moduleFour) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  // As we push a Module onto the Stream, it gets opened.
  // When a Module open()s, it opens the Tasks that it contains.
  //
  // Since we cannot provide an argument to this embedded
  // call to open(), we supplied specified the number of
  // threads in the constructor of our Tasks.

  if (theStream.push(moduleThree) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push(moduleTwo) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push(moduleOne) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  // Now that the Modules are open, the Tasks threads should
  // be launching and entering their svc() loop, so we send
  // some messages down the Stream.

  int sent = 1;

  ACE_Message_Block *message;

  while (sent <= numberOfMessages) {

    // First, create ourselves a Message_Block.
    // see Tutorials 10-13 for more information
    // about Message_Blocks and Message_Queues.

    message = new ACE_Message_Block(128);

    // Now, we grab the write-pointer from the Block,
    // and sprintf() our text into it.

    ACE_OS::sprintf(message->wr_ptr(), "Message No. %d", sent);

    // All we have to do now is drop the Message_Block
    // into the Stream.

    // It is always a good idea to duplicate() a Message_Block
    // when you put it into any Message_Queue, as then
    // you can always be allowed to release() your copy
    // without worry.

    if (theStream.put(message->duplicate(), 0) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1);
    }

    message->release();
    ++sent;
  }

  // Now that we've sent our Message_Blocks, close down
  // the Stream.
  //
  // The Stream will automagically delete the Modules and
  // the contained Tasks.  We don't have to do that.
  //
  // This call will block (due to the way we've written our
  // Task class) until all Message_Blocks have cleared the
  // entire Stream, and all associated threads have exited.

  theStream.close();

  return 0;
}