summaryrefslogtreecommitdiff
path: root/TAO/tao/Dynamic_TP/DTP_Task.h
blob: 73dfeff86510c043183e321ba031d4fd5853fb18 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// -*- C++ -*-

//=============================================================================
/**
 *  @file    DTP_Task.h
 */
//=============================================================================

#ifndef TAO_DYNAMIC_TP_TASK_H
#define TAO_DYNAMIC_TP_TASK_H

#include /**/ "ace/pre.h"

#include "tao/orbconf.h"

#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0

#include "tao/Dynamic_TP/dynamic_tp_export.h"
#include "tao/Dynamic_TP/DTP_Config.h"
#include "tao/CSD_ThreadPool/CSD_TP_Queue.h"
#include "tao/CSD_ThreadPool/CSD_TP_Request.h"
#include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
#include "tao/PortableServer/PortableServer.h"
#include "tao/Condition.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/Task.h"
#include "ace/Synch.h"
#include "ace/Containers_T.h"
#include "ace/Vector_T.h"

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

/**
  * @class TP_Task
  *
  * @brief Active Object managing a queue of request objects.
  *
  * There are two types of "users" of a TP_Task object:
  *
  *    1) The TP_Strategy object that "owns" this task object.
  *    2) The worker threads that "run" this task object as an
  *       "active object".
  *
  * The TP_Strategy object that "owns" this task object dictates
  * when the worker threads are activated and when they are shutdown.  It
  * also injects requests into this task's queue via calls to the
  * add_request() method.  It is also the TP_Strategy object that
  * dictates the number of worker threads to be activated via a call to
  * the set_num_threads() method.
  *
  * The active object pattern is implemented via the use of the
  * the ACE_Task_Base base class, and each worker thread will
  * invoke this task's svc() method, and when the svc() returns, the
  * worker thread will invoke this task's close() method (with the
  * flag argument equal to 0).
  */
class TAO_Dynamic_TP_Export TAO_DTP_Task : public ACE_Task_Base
{
public:

  /// Default Constructor.
  TAO_DTP_Task();

  /// Virtual Destructor.
  virtual ~TAO_DTP_Task();

  struct Open_Args {
    TAO_DTP_Definition task_thread_config;
  };


  /// Put a request object on to the request queue.
  /// Returns true if successful, false otherwise (it has been "rejected").
  bool add_request(TAO::CSD::TP_Request* request);

  /// Activate the worker threads
  virtual int open(void* args = 0);

  /// The "mainline" executed by each worker thread.
  virtual int svc();

  virtual int close (u_long flag = 0);

  /// Set the thread and queue config.

  void set_init_pool_threads(size_t thr_count);

  void set_min_pool_threads(size_t thr_count);

  void set_max_pool_threads(size_t thr_count);

  void set_thread_stack_size(size_t stack_sz);

  void set_thread_idle_time(ACE_Time_Value thr_timeout);

  void set_max_request_queue_depth(size_t queue_depth);

  /// Get the thread and queue config.

  size_t get_init_pool_threads();

  size_t get_min_pool_threads();

  size_t get_max_pool_threads();

  size_t get_max_request_queue_depth();

  size_t get_thread_stack_size();

  time_t get_thread_idle_time();

  /// Cancel all requests that are targeted for the provided servant.
  void cancel_servant (PortableServer::Servant servant);

private:
  /// get the next available request. Return true if one available, nonblocking
  bool request_ready (TAO::CSD::TP_Dispatchable_Visitor &v,
                      TAO::CSD::TP_Request_Handle &r);

  /// release the request, reset the accepting flag if necessary
  void clear_request (TAO::CSD::TP_Request_Handle &r);

  void add_busy (void);
  void remove_busy (void);
  void add_active (void);
  bool remove_active (bool);
  bool need_active (void);
  bool above_minimum (void);

  typedef TAO_SYNCH_MUTEX         LockType;
  typedef TAO_Condition<LockType> ConditionType;

  /// Lock used to synchronize the "active_workers_" condition
  LockType aw_lock_;
  /// Lock used to synchronize manipulation of the queue
  LockType queue_lock_;
  /// Lock used to synchronize the "work_available_" condition
  LockType work_lock_;

  /// Condition used to signal worker threads that they may be able to
  /// find a request in the queue_ that needs to be dispatched to a
  /// servant that is currently "not busy".
  /// This condition will be signal()'ed each time a new request is
  /// added to the queue_, and also when a servant has become "not busy".
  ConditionType work_available_;

  /// This condition will be signal()'ed each time the num_threads_
  /// data member has its value changed.  This is used to keep the
  /// close(1) invocation (ie, a shutdown request) blocked until all
  /// of the worker threads have stopped running.
  ConditionType active_workers_;

  /// The number of threads that are currently active. This may be
  /// different than the total number of threads since the latter
  /// may include threads that are shutting down but not reaped.
  size_t active_count_;

  /// Flag used to indicate when this task will (or will not) accept
  /// requests via the the add_request() method.
  bool accepting_requests_;

  /// Flag used to initiate a shutdown request to all worker threads.
  bool shutdown_;

  /// Flag to indicate something is on the queue. works in conjunction with
  /// the work_available condition
  bool check_queue_;

  /// Flag used to avoid multiple open() calls.
  bool opened_;

  /// The number of requests in the local queue.
  size_t num_queue_requests_;

  /// The number of currently active worker threads.
  ACE_Atomic_Op <TAO_SYNCH_MUTEX, unsigned long> busy_threads_;

  /// The queue of pending servant requests (a.k.a. the "request queue").
  TAO::CSD::TP_Queue queue_;

  /// The low water mark for dynamic threads to settle to.
  size_t init_pool_threads_;

  /// The low water mark for dynamic threads to settle to.
  size_t min_pool_threads_;

  /// The high water mark for dynamic threads to be limited to.
  size_t max_pool_threads_;

  /// If the max_pool_threads_ value has been met, then ORB requests coming in can be queued.
  /// This is the maximum number that will be allowed.
  size_t max_request_queue_depth_;

  /// This is the memory stack size allowable for each thread.
  size_t thread_stack_size_;

  /// This is the maximum amount of time in seconds that an idle thread can
  /// stay alive before being taken out of the pool.
  ACE_Time_Value thread_idle_time_;
};

TAO_END_VERSIONED_NAMESPACE_DECL

#if defined (__ACE_INLINE__)
# include "tao/Dynamic_TP/DTP_Task.inl"
#endif /* __ACE_INLINE__ */

#endif /* (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 */

#include /**/ "ace/post.h"

#endif /* TAO_DYNAMIC_TP_TASK_H */