1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
/* -*- C++ -*- */
// $Id$
// ============================================================================
//
// = LIBRARY
// ace
//
// = FILENAME
// TP_Reactor.h
//
// = DESCRIPTION
// The <ACE_TP_Reactor> (aka, Thread Pool Reactor) uses the
// Leader/Followers pattern to demultiplex events among a pool of
// threads. When using a thread pool reactor, an application
// pre-spawns a _fixed_ number of threads. When these threads
// invoke the <ACE_TP_Reactor>'s <handle_events> method, one thread
// will become the leader and wait for an event. The other
// follower threads will queue up waiting for their turn to become
// the leader. When an event occurs, the leader will pick a
// follower to become the leader and go on to handle the event.
// The consequence of using <ACE_TP_Reactor> is the amortization of
// the costs used to creating threads. The context switching cost
// will also reduce. More over, the total resources used by
// threads are bounded because there are a fixed number of threads.
//
// = AUTHOR
// Irfan Pyarali <irfan@cs.wustl.edu> and Nanbor Wang <nanbor@cs.wustl.edu>
//
// ============================================================================
#ifndef ACE_TP_REACTOR_H
#include "ace/pre.h"
#define ACE_TP_REACTOR_H
#include "ace/Select_Reactor.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
class ACE_Export ACE_EH_Dispatch_Info
{
// = TITLE
//
// This structure contains information of the activated event
// handler.
public:
ACE_EH_Dispatch_Info (void);
void set (ACE_HANDLE handle,
ACE_Event_Handler *event_handler,
ACE_Reactor_Mask mask,
ACE_EH_PTMF callback);
void reset (void);
int dispatch (void) const;
ACE_HANDLE handle_;
ACE_Event_Handler *event_handler_;
ACE_Reactor_Mask mask_;
ACE_EH_PTMF callback_;
int dispatch_;
};
class ACE_Export ACE_TP_Reactor : public ACE_Select_Reactor
{
// = TITLE
// Specialization of Select Reactor to support thread-pool based
// event dispatching.
//
// = DESCRIPTION
// One of the short comings of the Select_Reactor in ACE is that
// it did not support a thread pool based event dispatching
// model, similar to the one in WFMO_Reactor. In
// Select_Reactor, only thread can be blocked in <handle_events>
// at any given time.
//
// A new Reactor has been added to ACE that removes this
// short-coming. TP_Reactor is a specialization of Select
// Reactor to support thread-pool based event dispatching. This
// Reactor takes advantage of the fact that events reported by
// <select> are persistent if not acted upon immediately. It
// works by remembering the event handler that just got
// activated, releasing the internal lock (so that some other
// thread can start waiting in the event loop) and then
// dispatching the event handler outside the context of the
// Reactor lock.
//
// This Reactor is best suited for situations when the callbacks
// to event handlers can take arbitrarily long and/or a number
// of threads are available to run the event loops.
//
// Note that callback code in Event Handlers
// (e.g. Event_Handler::handle_input) does not have to be
// modified or made thread-safe for this Reactor. This is
// because an activated Event Handler is suspended in the
// Reactor before the upcall is made and resumed after the
// upcall completes. Therefore, one Event Handler cannot be
// called by multiple threads simultaneously.
public:
// = Initialization and termination methods.
ACE_TP_Reactor (ACE_Sig_Handler * = 0,
ACE_Timer_Queue * = 0,
int mask_signals = 1);
// Initialize <ACE_TP_Reactor> with the default size.
ACE_TP_Reactor (size_t max_number_of_handles,
int restart = 0,
ACE_Sig_Handler * = 0,
ACE_Timer_Queue * = 0,
int mask_signals = 1);
// Initialize the <ACE_TP_Reactor> to manage
// <max_number_of_handles>. If <restart> is non-0 then the
// <ACE_Reactor>'s <handle_events> method will be restarted
// automatically when <EINTR> occurs. If <signal_handler> or
// <timer_queue> are non-0 they are used as the signal handler and
// timer queue, respectively.
// = Event loop drivers.
virtual int handle_events (ACE_Time_Value *max_wait_time = 0);
// This event loop driver that blocks for <max_wait_time> before
// returning. It will return earlier if timer events, I/O events,
// or signal events occur. Note that <max_wait_time> can be 0, in
// which case this method blocks indefinitely until events occur.
//
// <max_wait_time> is decremented to reflect how much time this call
// took. For instance, if a time value of 3 seconds is passed to
// handle_events and an event occurs after 2 seconds,
// <max_wait_time> will equal 1 second. This can be used if an
// application wishes to handle events for some fixed amount of
// time.
//
// Returns the total number of <ACE_Event_Handler>s that were
// dispatched, 0 if the <max_wait_time> elapsed without dispatching
// any handlers, or -1 if something goes wrong.
virtual int handle_events (ACE_Time_Value &max_wait_time);
virtual int mask_ops (ACE_Event_Handler *eh,
ACE_Reactor_Mask mask,
int ops);
// GET/SET/ADD/CLR the dispatch mask "bit" bound with the <eh> and
// <mask>.
virtual int mask_ops (ACE_HANDLE handle,
ACE_Reactor_Mask mask,
int ops);
// GET/SET/ADD/CLR the dispatch mask "bit" bound with the <handle>
// and <mask>.
static void no_op_sleep_hook (void *);
// Called from handle events
virtual void wakeup_all_threads (void);
// Wake up all threads in waiting in the event loop
ACE_ALLOC_HOOK_DECLARE;
// Declare the dynamic allocation hooks.
protected:
// = Internal methods that do the actual work.
virtual int dispatch_io_set (int number_of_active_handles,
int& number_dispatched,
int mask,
ACE_Handle_Set& dispatch_mask,
ACE_Handle_Set& ready_mask,
ACE_EH_PTMF callback);
// Overwrites <ACE_Select_Reactor::dispatch_io_set> to *not*
// dispatch any event handlers. The information of one activated
// event handler is stored away, so that the event handler can be
// dispatch later.
virtual void notify_handle (ACE_HANDLE handle,
ACE_Reactor_Mask mask,
ACE_Handle_Set &,
ACE_Event_Handler *eh,
ACE_EH_PTMF callback);
// This method shouldn't get called.
virtual int notify_handle (ACE_EH_Dispatch_Info &dispatch_info);
// Notify the appropriate <callback> in the context of the <eh>
// associated with <handle> that a particular event has occurred.
ACE_EH_Dispatch_Info dispatch_info_;
// Dispatch information of the activated event handler
private:
ACE_TP_Reactor (const ACE_TP_Reactor &);
ACE_TP_Reactor &operator = (const ACE_TP_Reactor &);
// Deny access since member-wise won't work...
};
#if defined (__ACE_INLINE__)
#include "ace/TP_Reactor.i"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
#endif /* ACE_TP_REACTOR_H */
|