summaryrefslogtreecommitdiff
path: root/docs/tutorials/014/Task.cpp
blob: 51b4e540be5c4a2d15ecffec87ea18e06a2413ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210

// $Id$

// Task.cxx
//
// Tutorial regarding a way to use ACE_Stream.
//
// written by bob mcwhirter (bob@netwrench.com)
//
//

#include <ace/Message_Block.h>

#include "Task.h"

Task::Task(const char * nameOfTask,
	   int numberOfThreads)
  : d_numberOfThreads(numberOfThreads),
    d_barrier(numberOfThreads)
{
  // Just initialize our name, number of threads, and barrier.

  ACE_OS::strcpy(d_nameOfTask, nameOfTask);
}

Task::~Task(void)
{
  ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::~Task() -- once per Task\n", d_nameOfTask));
}

int Task::open(void *arg) 
{
  ACE_UNUSED_ARG(arg);

  ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::open() -- once per Task\n", d_nameOfTask));
  
  // call ACE_Task::activate() to spawn the threads using
  // our Task::svc() as the function to be run.

  // FMM -- Frequently Made Mistake --
  //  
  // If you specify the flag THR_DETACHED when activating the
  // Task, you will get an assert() violation during close(),
  // since the Task waits for all of its threads to rejoin.
  // 

  return this->activate(THR_NEW_LWP, d_numberOfThreads);
}

int Task::put(ACE_Message_Block *message,
	      ACE_Time_Value *timeout)
{
  // ACE_Stream uses the put() method of Tasks to send messages.
  // This defaultly does nothing.  Here we link our put() method
  // directly to our putq() method, so that Messages put() to us
  // will appear in the Message_Queue that is checked by the
  // service threads.
  
  return this->putq(message, timeout);
}

int Task::close(u_long flags)
{

  // When the Stream closes the Module, the Module then close()'s the Task
  // and passing a value of (1) as the flag.

  // When a service thread exits, it calls close() with a value that is not
  // (1).

  // We use this fact to tell the difference between closing a service thread,
  // and closing the main Task itself.

  if (flags == 1) {

    // The Module has asked to close the main Task.

    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags == 1 -- once per Task\n", d_nameOfTask));

    // We create a Message_Block...

    ACE_Message_Block *hangupBlock = new ACE_Message_Block();

    // And make it of the type MB_HANGUP.  

    hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP);

    // We then send this Block into the Message_Queue to be seen by the 
    // service threads.

    // Once again we duplicate() the Block as send it off...
    
    if (this->putq(hangupBlock->duplicate()) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1);
    }
    
    // ..and we're free to release() our copy of it.

    hangupBlock->release();

    // Now, all we have to do is wait() for the service threads to all 
    // exit.  This is where using THR_DETACHED in the activate() method
    // will come back to haunt you.

    // The Stream waits until this returns before attempting to remove
    // the next Module/Task group in the Stream.  This allows for an
    // orderly shutting down of the Stream.

    return this->wait();


  } else {

    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::close() -- flags != 1 -- once per servicing thread\n", d_nameOfTask));

    // This is where we can clean up any mess left over by each service thread.
    // In this Task, there is nothing to do.

  }

  return 0;

}

int Task::svc(void)
{

  // This is the function that our service threads run once they are spawned.

  ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- once per servicing thread\n", d_nameOfTask));

  // First, we wait until all of our peer service threads have arrived
  // at this point also.

  d_barrier.wait();

  ACE_Message_Block *messageBlock;

  while (1) {

    // And now we loop almost infinitely.

    // getq() will block until a Message_Block is available to be read,
    // or an error occurs.

    if ( this->getq(messageBlock, 0) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() getq"), -1);
    }

    if (messageBlock->msg_type() == ACE_Message_Block::MB_HANGUP) {
      
      // If the Message_Block is of type MB_HANGUP, then we're being asked
      // to shut down nicely.

      ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- HANGUP block received\n", d_nameOfTask));

      // So, we duplicate the Block, and put it back into the Message_Queue,
      // in case there are some more peer service threads still running.

      if (this->putq(messageBlock->duplicate()) == -1) {
	ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() putq"), -1);
      }

      // We release our copy of the Block.
      messageBlock->release();

      // And we break out of the nearly infinitely loop, and
      // head towards close() ourselves.
      break;
    }

    // If we're here, then we've received a Message_Block that was 
    // not informing us to quit, so we're assuming it's a valid
    // meaningful Block.

    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- Normal block received\n", d_nameOfTask));

    // We grab the read-pointer from the Block, and display it through a DEBUG statement.

    ACE_DEBUG ((LM_DEBUG, "(%P|%t) %s Task::svc() -- %s\n", d_nameOfTask, messageBlock->rd_ptr() ));

    // We pretend that this takes to time to process the Block.
    // If you're on a fast machine, you might have to raise this
    // value to actually witness different threads handling
    // blocks for each Task.

    ACE_OS::sleep (ACE_Time_Value (0, 250));

    // Since we're part of a Stream, we duplicate the Block, and 
    // send it on to the next Task.

    if (put_next(messageBlock->duplicate()) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc() put_next"), -1);
    }
    
    // And then we release our copy of it.

    messageBlock->release();

  }

  return 0;

}


const char * Task::nameOfTask(void) const
{
  return d_nameOfTask;
}