summaryrefslogtreecommitdiff
path: root/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp
blob: adc18ab7c7f946c20d393ed7eda6556f4fbe8bdb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// $Id$

#include "tao/CSD_ThreadPool/CSD_TP_Task.h"
#include "tao/CSD_ThreadPool/CSD_TP_Request.h"
#include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
#include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h"

ACE_RCSID (CSD_ThreadPool,
           TP_Task,
           "$Id$")

#if !defined (__ACE_INLINE__)
# include "tao/CSD_ThreadPool/CSD_TP_Task.inl"
#endif /* ! __ACE_INLINE__ */

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

TAO::CSD::TP_Task::~TP_Task()
{
}


bool
TAO::CSD::TP_Task::add_request(TP_Request* request)
{
  GuardType guard(this->lock_);

  if (!this->accepting_requests_)
    {
      ACE_DEBUG((LM_DEBUG,"(%P|%t) TP_Task::add_request() - "
                 "not accepting requests\n"));
      return false;
    }

  // We have made the decision that the request is going to be placed upon
  // the queue_.  Inform the request that it is about to be placed into
  // a request queue.  Some requests may not need to do anything in
  // preparation of being placed into a queue.  Others, however, may need
  // to perfom a "clone" operation on some underlying request data before
  // the request can be properly placed into a queue.
  request->prepare_for_queue();

  this->queue_.put(request);

  this->work_available_.signal();

  return true;
}


int
TAO::CSD::TP_Task::open(void* num_threads_ptr)
{
  Thread_Counter num = 1;

  if (num_threads_ptr != 0)
    {
      Thread_Counter* tmp = static_cast<Thread_Counter*> (num_threads_ptr);

      if (tmp == 0)
        {
          ACE_ERROR_RETURN((LM_ERROR,
                            "(%P|%t) TP_Task failed to open.  "
                            "Invalid argument type passed to open().\n"),
                           -1);
        }

      num = *tmp;
    }

  // We can't activate 0 threads.  Make sure this isn't the case.
  if (num < 1)
    {
      ACE_ERROR_RETURN((LM_ERROR,
                        "(%P|%t) TP_Task failed to open.  "
                        "num_threads (%u) is less-than 1.\n",
                        num),
                       -1);
    }

  // Likewise, we can't activate too many.  Make sure this isn't the case.
  if (num > MAX_THREADPOOL_TASK_WORKER_THREADS)
    {
      ACE_ERROR_RETURN((LM_ERROR,
                        "(%P|%t) TP_Task failed to open.  "
                        "num_threads (%u) is too large.  Max is %d.\n",
                        num, MAX_THREADPOOL_TASK_WORKER_THREADS),
                        -1);
    }

  // We need the lock acquired from here on out.
  GuardType guard(this->lock_);

  // We can assume that we are in the proper state to handle this open()
  // call as long as we haven't been open()'ed before.
  if (this->opened_)
    {
      ACE_ERROR_RETURN((LM_ERROR,
                        "(%P|%t) TP_Task failed to open.  "
                        "Task has previously been open()'ed.\n"),
                       -1);
    }

  // Activate this task object with 'num' worker threads.
  if (this->activate(THR_NEW_LWP | THR_JOINABLE, num) != 0)
    {
      // Assumes that when activate returns non-zero return code that
      // no threads were activated.
      ACE_ERROR_RETURN((LM_ERROR,
                        "(%P|%t) TP_Task failed to activate "
                        "(%d) worker threads.\n",
                        num),
                       -1);
    }

  // Now we have past the point where we can say we've been open()'ed before.
  this->opened_ = true;

  // Now we wait until all of the threads have started.
  while (this->num_threads_ != num)
    {
      this->active_workers_.wait();
    }

  // We can now accept requests (via our add_request() method).
  this->accepting_requests_ = true;

  return 0;
}


int
TAO::CSD::TP_Task::svc()
{
  // Account for this current worker thread having started the
  // execution of this svc() method.
  {
    GuardType guard(this->lock_);
    // Put the thread id into a collection which is used to check whether  
    // the orb shutdown is called by one of the threads in the pool.
    ACE_thread_t thr_id = ACE_OS::thr_self ();
    if (this->activated_threads_.set(thr_id, this->num_threads_) == -1)
      {
        ACE_ERROR_RETURN((LM_ERROR,
          ACE_TEXT("(%P|%t)TP_Task::svc: number of threads is out of range \n")),
          0); 
      }
    ++this->num_threads_;
    this->active_workers_.signal();
  }

  // This visitor object will be re-used over and over again as part of
  // the "GetWork" logic below.
  TP_Dispatchable_Visitor dispatchable_visitor;

  // Start the "GetWork-And-PerformWork" loop for the current worker thread.
  while (1)
    {
      TP_Request_Handle request;

      // Do the "GetWork" step.
      {
        // Acquire the lock until just before we decide to "PerformWork".
        GuardType guard(this->lock_);

        // Start the "GetWork" loop.
        while (request.is_nil())
        {
          if (this->shutdown_initiated_)
            {
              // This breaks us out of all loops with one fell swoop.
              return 0;
            }

          // There is no need to visit the queue if it is empty.
          if (!this->queue_.is_empty())
            {
              // Reset the visitor since we use it over and over.  This
              // will cause the visitor to drop any reference to
              // a request that it may still be holding from a prior
              // call to accept_visitor().
              dispatchable_visitor.reset();

              // Visit the requests in the queue in hopes of
              // locating the first "dispatchable" (ie, not busy) request.
              // If a dispatchable request is located, it is extracted
              // from the queue and saved in a handle data member in the
              // visitor object.
              this->queue_.accept_visitor(dispatchable_visitor);

              // If a dispatchable request is located, it is extracted
              // from the queue and saved in a handle data member in the
              // visitor object.  Let's get a "copy" (or a NULL pointer
              // if the visitor didn't locate/extract one).
              request = dispatchable_visitor.request();
            }

          // Either the queue is empty or we couldn't find any dispatchable
          // requests in the queue at this time.
          if (request.is_nil())
            {
              // Let's wait until we hear about the possibility of
              // work before we go look again.
              this->work_available_.wait();
            }
        }

        // We have dropped out of the "while (request.is_nil())" loop.
        // We only get here is we located/extracted a dispatchable request
        // from the queue.  Note that the visitor will have already
        // marked the target servant as now being busy (because of us).
        // We can now safely release the lock.
      }

      // Do the "PerformWork" step.  We don't need the lock_ to do this.
      request->dispatch();

      // Now that the request has been dispatched, we need to mark the target
      // servant as no longer being busy, and we need to signal any wait()'ing
      // worker threads that there may be some dispatchable requests in the
      // queue now for this not-busy servant.  We need the lock_ to do this.
      {
        GuardType guard(this->lock_);
        request->mark_as_ready();
        this->work_available_.signal();
      }

      // Note that the request will be "released" here when the request
      // handle falls out of scope and its destructor performs the
      // _remove_ref() call on the underlying TP_Request object.
    }

  // This will never get executed.
  return 0;
}


int
TAO::CSD::TP_Task::close(u_long flag)
{
  GuardType guard(this->lock_);

  if (flag == 0)
    {
      // Worker thread is closing.
      --this->num_threads_;
      this->active_workers_.signal();
    }
  else
    {
      // Strategy object is shutting down the task.

      // Do nothing if this task has never been open()'ed.
      if (!this->opened_)
        {
          return 0;
        }

      // Set the shutdown flag to true.
      this->shutdown_initiated_ = true;

      // Stop accepting requests.
      this->accepting_requests_ = false;

      // Signal all worker threads waiting on the work_available_ condition.
      this->work_available_.broadcast();

      size_t num_waiting_threads = 0;

      ACE_thread_t my_thr_id = ACE_OS::thr_self ();

      // Check whether the calling thread(calling orb shutdown) is one of the 
      // threads in the pool. If it is then it should not wait itself.
      size_t size = this->activated_threads_.size ();
      
      for (size_t i = 0; i < size; i ++)
        {
          ACE_thread_t thr_id = 0;
          if (activated_threads_.get (thr_id, i) == 0 && thr_id == my_thr_id)
            {
              num_waiting_threads = 1;
              break;
            }
        }

      // Wait until all worker threads have shutdown.
      while (this->num_threads_ != num_waiting_threads)
        {
          this->active_workers_.wait();
        }

      // Cancel all requests.
      TP_Cancel_Visitor cancel_visitor;
      this->queue_.accept_visitor(cancel_visitor);
    }

  return 0;
}



void
TAO::CSD::TP_Task::cancel_servant (PortableServer::Servant servant
                                   ACE_ENV_ARG_DECL)
{
  GuardType guard(this->lock_);

  // Cancel the requests targeted for the provided servant.
  TP_Cancel_Visitor cancel_visitor(servant);
  this->queue_.accept_visitor(cancel_visitor);
}

TAO_END_VERSIONED_NAMESPACE_DECL