summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Proxy_Collection.h
blob: 3ca9c10df9cc4950402b22f586ea1f3e5aa4fa87 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
/* -*- C++ -*- */
// $Id$
//
// ============================================================================
//
// = LIBRARY
//   ORBSVCS Real-time Event Channel
//
// = FILENAME
//   EC_Proxy_Collection
//
// = AUTHOR
//   Carlos O'Ryan (coryan@cs.wustl.edu)
//
// = CREDITS
//   http://www.cs.wustl.edu/~coryan/EC/index.html
//
// ============================================================================

#ifndef TAO_EC_PROXY_COLLECTION_H
#define TAO_EC_PROXY_COLLECTION_H
#include "ace/pre.h"

#include "EC_Busy_Lock.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "tao/corba.h"
#include "ace/Containers.h"

template<class Target> class TAO_EC_Worker;

template<class PROXY>
class TAO_EC_Proxy_Collection
{
  // = TITLE
  //   EC_Proxy_Collection
  //
  // = DESCRIPTION
  //   Many components in the Event Channel need to keep a collection
  //   of proxies; these collections must be able to cope with several
  //   concurrency issues:
  //   + Some threads may need to iterate over the collection and
  //     invoke a method on each element.  Locking the collection
  //     while this is done is not feasible in all cases: under some
  //     configurations the same thread that is iterating over the
  //     collection may need to make changes to the set.
  //   + A recursive lock does not solve the concurrency problems
  //     because recursive changes to the collection still invalidate
  //     the iterators.
  //
  //   There are several solutions to this problem (see the VARIANTS)
  //   section, and there is no single one that works bests in all
  //   cases.  As usual, we wish the strategize the protocol used to
  //   serialize iterations and changes to the collection.  This class
  //   encapsulates that protocol.
  //
  //   The reasons are too long to enumerate here, but the Iterator
  //   pattern is not appropriate in this case.  We use Function
  //   Objects (aka Worker) to perform a client-defined task on each
  //   member.
  //
  //   Regular member functions are used to insert, remove and update
  //   members of the collection and to shutdown (i.e. perform final
  //   cleanup operations).
  //
  //   The class must also collaborate with other components of the
  //   EC to efficiently and safely perform memory managment of the
  //   members in the collection.
  //
  // = VARIANTS
  //
  //   We identify several sources of variation:
  //
  //   + Immediate_Changes: in this variant the iteration in performed
  //   while holding some kind of synchronization primitive, such as a
  //   thread mutex, a recursive mutex, a RW lock, etc.
  //   This is only useful in configurations where a separate thread
  //   dispatches the events, and thus, can only be used with real
  //   locks.
  //
  //   + Copy_On_Read: before performing the iteration the collection
  //   is duplicated into a temporary array.  Thus no locks are held
  //   during the iteration.  This is a very expensive approach, but
  //   useful in many cases.
  //   The kind of lock is also strategized in this case.
  //
  //   + Copy_On_Write: this is very similar to the previous approach,
  //   but the collection is only duplicated when a change is required
  //   while some thread is performing an iteration.  The iteration
  //   continues over the original copy, while the changes are
  //   performed in the duplicate.  The new copy of the collection is
  //   used for any subsequent operations, the original is discarded
  //   when the last thread using it completes its work.
  //   This approach optimizes for the case where no changes are
  //   is duplicated into a temporary array.  Thus no locks are held
  //   during the iteration.  This is a very expensive approach, but
  //   useful in many cases.
  //   The kind of lock is also strategized in this case.
  //
  //   + Delayed_Changes: before starting the iteration a counter is
  //   incremented, this counter is used to keep track of the number
  //   of threads concurrently using the collection.
  //   If a thread wants to perform a change to the collection it must
  //   first verify that there are no threads iterating over it.  If
  //   there are any threads then the thread queues the modification
  //   for later execution, using the Command pattern.
  //   The kind of lock is strategized, as this approach is used in
  //   single threaded configurations.
  //   There are two main variations:
  //     - An upcall can result in new dispatches: in this case we
  //       have to keep track of a the list of current threads using
  //       a Set, to avoid dead-locks.
  //       IMPLEMENTATION: the design is not complete, probably
  //       similar to the next one.
  //     - Otherwise we just need to control the concurrency using the
  //       algorithm described below.
  //
  //
  // = MEMORY MANAGMENT
  //   It assumes ownership of the proxies added to the collection,
  //   it increases the reference count.
  //
  // = LOCKING
  //   Locking is provided by derived classes.
  //
  // = TODO
  //
public:
  virtual ~TAO_EC_Proxy_Collection (void);
  // destructor

  virtual void for_each (TAO_EC_Worker<PROXY> *worker,
                         CORBA::Environment &ACE_TRY_ENV) = 0;
  // Iterate over the collection and invoke worker->work() for each
  // member of the collection.
  // This encapsulates

  virtual void connected (PROXY *proxy,
                          CORBA::Environment &ACE_TRY_ENV) = 0;
  // Insert a new element into the collection.  The collection assumes
  // ownership of the element.

  virtual void reconnected (PROXY *proxy,
                            CORBA::Environment &ACE_TRY_ENV) = 0;
  // Insert an element into the collection.  No errors can be raised
  // if the element is already present.
  // The collection assumes ownership, i.e. must invoke
  // <proxy->_decr_refcnt()> if the element is already present in the
  // collection.

  virtual void disconnected (PROXY *proxy,
                             CORBA::Environment &ACE_TRY_ENV) = 0;
  // Remove an element from the collection.

  virtual void shutdown (CORBA::Environment &ACE_TRY_ENV) = 0;
  // The EC is shutting down, must release all the elements.
};

// ****************************************************************

template<class PROXY, class COLLECTION, class ITERATOR, class ACE_LOCK>
class TAO_EC_Immediate_Changes : public TAO_EC_Proxy_Collection<PROXY>
{
  // = TITLE
  //   TAO_EC_Immediate_Changes
  //
  // = DESCRIPTION
  //   Implement the Immediate_
public:
  TAO_EC_Immediate_Changes (void);
  TAO_EC_Immediate_Changes (const COLLECTION &collection);

  // = The TAO_EC_Proxy methods
  virtual void for_each (TAO_EC_Worker<PROXY> *worker,
                         CORBA::Environment &ACE_TRY_ENV);
  virtual void connected (PROXY *proxy,
                          CORBA::Environment &ACE_TRY_ENV);
  virtual void reconnected (PROXY *proxy,
                            CORBA::Environment &ACE_TRY_ENV);
  virtual void disconnected (PROXY *proxy,
                             CORBA::Environment &ACE_TRY_ENV);
  virtual void shutdown (CORBA::Environment &ACE_TRY_ENV);

private:
  COLLECTION collection_;

  ACE_LOCK lock_;
};

// ****************************************************************

template<class PROXY, class COLLECTION, class ITERATOR, class ACE_LOCK>
class TAO_EC_Copy_On_Read : public TAO_EC_Proxy_Collection<PROXY>
{
  // = TITLE
  //   TAO_EC_Copy_On_Read
  //
  // = DESCRIPTION
  //   Implement the Copy_On_Read protocol
  //   The class is parametric on the kind of collection and locking
  //   mechanism used.
  //
  // = TODO
  //
public:
  TAO_EC_Copy_On_Read (void);
  TAO_EC_Copy_On_Read (const COLLECTION &collection);
  // Constructors

  // = The TAO_EC_Proxy methods
  virtual void for_each (TAO_EC_Worker<PROXY> *worker,
                         CORBA::Environment &ACE_TRY_ENV);
  virtual void connected (PROXY *proxy,
                          CORBA::Environment &ACE_TRY_ENV);
  virtual void reconnected (PROXY *proxy,
                            CORBA::Environment &ACE_TRY_ENV);
  virtual void disconnected (PROXY *proxy,
                             CORBA::Environment &ACE_TRY_ENV);
  virtual void shutdown (CORBA::Environment &ACE_TRY_ENV);

private:
  COLLECTION collection_;

  ACE_LOCK lock_;
};

// ****************************************************************

template<class Target,class Object> class TAO_EC_Connected_Command;
template<class Target,class Object> class TAO_EC_Disconnected_Command;
template<class Target,class Object> class TAO_EC_Reconnected_Command;
template<class Target> class TAO_EC_Shutdown_Command;

template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL>
class TAO_EC_Delayed_Changes : public TAO_EC_Proxy_Collection<PROXY>
{
  // = TITLE
  //   TAO_EC_Delayed_Operations
  //
  // = DESCRIPTION
  //   This class implements the Delayed Operations protocol to solve
  //   the concurrency challenges outlined in the documentation of
  //   TAO_EC_Proxy_Collection.
  //   In short the class delays changes by putting them on an
  //   "operation queue", the operations are stored as command objects
  //   in this queue and executed once the system is quiescent
  //   (i.e. no threads are iterating over the collection).
  //
  //   The algorithm implemented so far is:
  //   - If a thread is using the set then it increases the busy
  //     count, this is done by calling the busy() method. Once the
  //     thread has stopped using the collection the idle() method is
  //     invoked and the busy count is decreased.
  //     A helper class (Busy_Lock) is used to hide this protocol
  //     behind the familiar GUARD idiom.
  //   - If the busy count reaches the busy_hwm then the thread must
  //     wait until the count reaches 0 again.
  //     This can be used to control the maximum concurrency in the
  //     EC, matching it (for example) with the number of
  //     processors. Setting the concurrency to a high value (say one
  //     million) allows for an arbitrary number of threads to execute
  //     concurrently.
  //  - If a modification is posted to the collection we need to
  //    execute it at some point.
  //    Just using the busy_hwm would not work, the HWM may not be
  //    reached ever, so another form of control is needed.
  //    Instead we use another counter, that keeps track of how many
  //    threads have used the set since the modification was
  //    posted. If this number of threads reaches max_write_delay then
  //    we don't allow any more threads to go in, eventually the
  //    thread count reaches 0 and we can proceed with the operations.
  //
  //  - There is one aspect of concurrency that can be problematic: if
  //    thread pushes events as part of an upcall then the same thread
  //    could be counted twice, we need to keep track of the threads
  //    that are dispatching events and not increase (or decrease) the
  //    reference count when a thread iterates twice over the same
  //    set.
  //
  //   This solves the major problems, but there are other issues to
  //   be addressed:
  //   + How do we ensure that the operations are eventually executed?
  //   + How do we simplify the execution of the locking protocol for
  //   clients of this class?
  //   + How do we minimize overhead for single threaded execution?
  //   + How do we minimize the overhead for the cases where the
  //     threads dispatching events don't post changes to the
  //     collection?
  //
public:
  TAO_EC_Delayed_Changes (void);
  TAO_EC_Delayed_Changes (const COLLECTION &collection);

  int busy (void);
  int idle (void);
  int execute_delayed_operations (void);

  void connected_i (PROXY *proxy,
                    CORBA::Environment &ACE_TRY_ENV);
  void reconnected_i (PROXY *proxy,
                    CORBA::Environment &ACE_TRY_ENV);
  void disconnected_i (PROXY *proxy,
                     CORBA::Environment &ACE_TRY_ENV);
  void shutdown_i (CORBA::Environment &ACE_TRY_ENV);

  typedef TAO_EC_Connected_Command<TAO_EC_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>,PROXY> Connected_Command;
  typedef TAO_EC_Reconnected_Command<TAO_EC_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>,PROXY> Reconnected_Command;
  typedef TAO_EC_Disconnected_Command<TAO_EC_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>,PROXY> Disconnected_Command;
  typedef TAO_EC_Shutdown_Command<TAO_EC_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE> > Shutdown_Command;

  // = The TAO_EC_Proxy methods
  virtual void for_each (TAO_EC_Worker<PROXY> *worker,
                         CORBA::Environment &ACE_TRY_ENV);
  virtual void connected (PROXY *proxy,
                          CORBA::Environment &ACE_TRY_ENV);
  virtual void reconnected (PROXY *proxy,
                            CORBA::Environment &ACE_TRY_ENV);
  virtual void disconnected (PROXY *proxy,
                             CORBA::Environment &ACE_TRY_ENV);
  virtual void shutdown (CORBA::Environment &ACE_TRY_ENV);

private:
  COLLECTION collection_;

  typedef TAO_EC_Busy_Lock_Adapter<TAO_EC_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE> > Busy_Lock;

  Busy_Lock lock_;

  ACE_SYNCH_MUTEX_T busy_lock_;

  ACE_SYNCH_CONDITION_T busy_cond_;

  CORBA::ULong busy_count_;

  CORBA::ULong write_delay_count_;

  CORBA::ULong busy_hwm_;
  CORBA::ULong max_write_delay_;
  // Control variables for the concurrency policies.

  ACE_Unbounded_Queue<ACE_Command_Base*> command_queue_;
};

// ****************************************************************

#if defined (__ACE_INLINE__)
#include "EC_Proxy_Collection.i"
#endif /* __ACE_INLINE__ */

#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
#include "EC_Proxy_Collection.cpp"
#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */

#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
#pragma implementation ("EC_Proxy_Collection.cpp")
#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */

#include "ace/post.h"
#endif /* TAO_EC_PROXY_COLLECTION_H */