summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.h
blob: 6b9bf11ff0a35e68d374c906c76add67f63a50eb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/* -*- C++ -*- */
// $Id$
//
// ============================================================================
//
// = LIBRARY
//   ORBSVCS Event Service Framework
//
// = FILENAME
//   ESF_Proxy_Collection
//
// = AUTHOR
//   Carlos O'Ryan (coryan@cs.wustl.edu)
//
// = CREDITS
//   http://www.cs.wustl.edu/~coryan/EC/index.html
//
// ============================================================================

#ifndef TAO_ESF_DELAYED_CHANGES_H
#define TAO_ESF_DELAYED_CHANGES_H

#include "ESF_Proxy_Collection.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ESF_Busy_Lock.h"
#include "ace/Containers.h"

template<class Target,class Object> class TAO_ESF_Connected_Command;
template<class Target,class Object> class TAO_ESF_Disconnected_Command;
template<class Target,class Object> class TAO_ESF_Reconnected_Command;
template<class Target> class TAO_ESF_Shutdown_Command;

template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL>
class TAO_ESF_Delayed_Changes : public TAO_ESF_Proxy_Collection<PROXY>
{
  // = TITLE
  //   TAO_ESF_Delayed_Operations
  //
  // = DESCRIPTION
  //   This class implements the Delayed Operations protocol to solve
  //   the concurrency challenges outlined in the documentation of
  //   TAO_ESF_Proxy_Collection.
  //   In short the class delays changes by putting them on an
  //   "operation queue", the operations are stored as command objects
  //   in this queue and executed once the system is quiescent
  //   (i.e. no threads are iterating over the collection).
  //
  //   The algorithm implemented so far is:
  //   - If a thread is using the set then it increases the busy
  //     count, this is done by calling the busy() method. Once the
  //     thread has stopped using the collection the idle() method is
  //     invoked and the busy count is decreased.
  //     A helper class (Busy_Lock) is used to hide this protocol
  //     behind the familiar GUARD idiom.
  //   - If the busy count reaches the busy_hwm then the thread must
  //     wait until the count reaches 0 again.
  //     This can be used to control the maximum concurrency in the
  //     EC, matching it (for example) with the number of
  //     processors. Setting the concurrency to a high value (say one
  //     million) allows for an arbitrary number of threads to execute
  //     concurrently.
  //  - If a modification is posted to the collection we need to
  //    execute it at some point.
  //    Just using the busy_hwm would not work, the HWM may not be
  //    reached ever, so another form of control is needed.
  //    Instead we use another counter, that keeps track of how many
  //    threads have used the set since the modification was
  //    posted. If this number of threads reaches max_write_delay then
  //    we don't allow any more threads to go in, eventually the
  //    thread count reaches 0 and we can proceed with the operations.
  //
  //  - There is one aspect of concurrency that can be problematic: if
  //    thread pushes events as part of an upcall then the same thread
  //    could be counted twice, we need to keep track of the threads
  //    that are dispatching events and not increase (or decrease) the
  //    reference count when a thread iterates twice over the same
  //    set.
  //
  //   This solves the major problems, but there are other issues to
  //   be addressed:
  //   + How do we ensure that the operations are eventually executed?
  //   + How do we simplify the execution of the locking protocol for
  //   clients of this class?
  //   + How do we minimize overhead for single threaded execution?
  //   + How do we minimize the overhead for the cases where the
  //     threads dispatching events don't post changes to the
  //     collection?
  //
public:
  TAO_ESF_Delayed_Changes (void);
  TAO_ESF_Delayed_Changes (const COLLECTION &collection);

  int busy (void);
  int idle (void);
  int execute_delayed_operations (void);

  void connected_i (PROXY *proxy,
                    CORBA::Environment &ACE_TRY_ENV);
  void reconnected_i (PROXY *proxy,
                    CORBA::Environment &ACE_TRY_ENV);
  void disconnected_i (PROXY *proxy,
                     CORBA::Environment &ACE_TRY_ENV);
  void shutdown_i (CORBA::Environment &ACE_TRY_ENV);

  typedef TAO_ESF_Connected_Command<TAO_ESF_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>,PROXY> Connected_Command;
  typedef TAO_ESF_Reconnected_Command<TAO_ESF_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>,PROXY> Reconnected_Command;
  typedef TAO_ESF_Disconnected_Command<TAO_ESF_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>,PROXY> Disconnected_Command;
  typedef TAO_ESF_Shutdown_Command<TAO_ESF_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE> > Shutdown_Command;

  // = The TAO_ESF_Proxy methods
  virtual void for_each (TAO_ESF_Worker<PROXY> *worker,
                         CORBA::Environment &ACE_TRY_ENV);
  virtual void connected (PROXY *proxy,
                          CORBA::Environment &ACE_TRY_ENV);
  virtual void reconnected (PROXY *proxy,
                            CORBA::Environment &ACE_TRY_ENV);
  virtual void disconnected (PROXY *proxy,
                             CORBA::Environment &ACE_TRY_ENV);
  virtual void shutdown (CORBA::Environment &ACE_TRY_ENV);

private:
  COLLECTION collection_;

  typedef TAO_ESF_Busy_Lock_Adapter<TAO_ESF_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE> > Busy_Lock;

  Busy_Lock lock_;

  ACE_SYNCH_MUTEX_T busy_lock_;

  ACE_SYNCH_CONDITION_T busy_cond_;

  CORBA::ULong busy_count_;

  CORBA::ULong write_delay_count_;

  CORBA::ULong busy_hwm_;
  CORBA::ULong max_write_delay_;
  // Control variables for the concurrency policies.

  ACE_Unbounded_Queue<ACE_Command_Base*> command_queue_;
};

// ****************************************************************

#if defined (__ACE_INLINE__)
#include "ESF_Delayed_Changes.i"
#endif /* __ACE_INLINE__ */

#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
#include "ESF_Delayed_Changes.cpp"
#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */

#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
#pragma implementation ("ESF_Delayed_Changes.cpp")
#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */

#endif /* TAO_ESF_DELAYED_CHANGES_H */