1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
|
// $Id$
#ifndef DSRT_DIRECT_DISPATCHER_IMPL_T_CPP
#define DSRT_DIRECT_DISPATCHER_IMPL_T_CPP
#include "DSRT_Direct_Dispatcher_Impl_T.h"
#if !defined (__ACE_INLINE__)
//#include "DSRT_Direct_Dispatcher_Impl_T.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(Kokyu, DSRT_Direct_Dispatcher_Impl_T, "$Id$")
namespace Kokyu
{
/*
//@@VS: This is somehow not being recognized by MSVC, which results
//in a link error. For now, the definition has been moved to the .h
//file. Needs further investigation.
template <class DSRT_Scheduler_Traits>
int Comparator_Adapter_Generator<DSRT_Scheduler_Traits>::MoreEligible::
operator ()(const DSRT_Dispatch_Item_var<DSRT_Scheduler_Traits>& item1,
const DSRT_Dispatch_Item_var<DSRT_Scheduler_Traits>& item2)
{
int rc = qos_comparator_ (item1->qos (), item2->qos ());
//more eligible
if (rc == 1)
return 1;
//if equally eligible, then resolve tie with the creation time of
//the item
if (rc == 0 && item1->insertion_time () < item2->insertion_time ())
return 1;
return 0;
}
*/
template <class DSRT_Scheduler_Traits>
DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::
DSRT_Direct_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy,
int sched_scope)
:DSRT_Dispatcher_Impl<DSRT_Scheduler_Traits>(sched_policy, sched_scope),
sched_queue_modified_ (0),
sched_queue_modified_cond_ (sched_queue_modified_cond_lock_)
{
//Run scheduler thread at highest priority
if (this->activate (this->rt_thr_flags_, 1, 0, this->executive_prio_) == -1)
{
ACE_ERROR ((LM_ERROR,
"(%t|%T) cannot activate scheduler thread in RT mode."
"Trying in non RT mode\n"));
if (this->activate (this->non_rt_thr_flags_) == -1)
ACE_ERROR ((LM_ERROR,
"(%t|%T) cannot activate scheduler thread\n"));
}
}
template <class DSRT_Scheduler_Traits> int
DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::
init_i (const DSRT_ConfigInfo&)
{
return 0;
}
template <class DSRT_Scheduler_Traits> int
DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::svc (void)
{
ACE_hthread_t scheduler_thr_handle;
ACE_Thread::self (scheduler_thr_handle);
#ifdef KOKYU_DSRT_LOGGING
int prio;
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("max prio=%d\n")
ACE_TEXT ("min prio=%d\n")
ACE_TEXT ("active prio=%d\n")
ACE_TEXT ("inactive prio=%d\n"),
max_prio_,
min_prio_,
active_prio_,
inactive_prio_));
if (ACE_OS::thr_getprio (scheduler_thr_handle, prio) == -1)
{
if (errno == ENOTSUP)
{
ACE_ERROR((LM_ERROR,
ACE_TEXT ("getprio not supported\n")
));
}
else
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p\n")
ACE_TEXT ("thr_getprio failed")));
}
}
ACE_DEBUG ((LM_DEBUG, "(%t): Scheduler thread prio is %d\n", prio));
#endif /*DSRT_LOGGING*/
while(1)
{
ACE_GUARD_RETURN (cond_lock_t,
mon, sched_queue_modified_cond_lock_, 0);
if (this->shutdown_flagged_)
break;
while (!sched_queue_modified_)
{
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t): sched thread about to wait on cv\n"));
#endif
sched_queue_modified_cond_.wait ();
}
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t): sched thread done waiting on cv\n"));
#endif
sched_queue_modified_ = 0;
ACE_Guard<ACE_SYNCH_RECURSIVE_MUTEX> synch_lock_mon(this->synch_lock_);
if (this->ready_queue_.current_size () <= 0)
continue;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t|%T):Sched Queue contents===>\n"));
this->ready_queue_.dump ();
#endif
DSRT_Dispatch_Item_var<DSRT_Scheduler_Traits> item_var;
this->ready_queue_.most_eligible (item_var);
ACE_hthread_t most_eligible_thr_handle = item_var->thread_handle ();
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):curr scheduled thr handle = %d\n",
this->curr_scheduled_thr_handle_));
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):most eligible thr handle = %d \n",
most_eligible_thr_handle));
#endif
if (this->curr_scheduled_thr_handle_ != most_eligible_thr_handle)
{
if (this->curr_scheduled_thr_handle_ != 0)
{
if (ACE_OS::thr_setprio (this->curr_scheduled_thr_handle_,
this->inactive_prio_,
this->sched_policy_) == -1)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p\n"),
ACE_TEXT ("thr_setprio on curr_scheduled_thr_handle_ failed.")));
ACE_DEBUG ((LM_DEBUG, "thr_handle = %d, prio = %d\n",
this->curr_scheduled_thr_handle_,
this->inactive_prio_));
}
}
if (ACE_OS::thr_setprio (most_eligible_thr_handle,
this->active_prio_, this->sched_policy_) == -1)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p\n"),
ACE_TEXT ("thr_setprio on most_eligible_thr_handle failed")));
}
this->curr_scheduled_thr_handle_ = most_eligible_thr_handle;
this->curr_scheduled_guid_ = item_var->guid ();
}
}
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t): sched thread exiting\n"));
#endif
return 0;
}
template <class DSRT_Scheduler_Traits>
int DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::
schedule_i (Guid_t id, const DSRT_QoSDescriptor& qos)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):schedule_i enter\n"));
#endif
DSRT_Dispatch_Item<DSRT_Scheduler_Traits>* item;
ACE_hthread_t thr_handle;
ACE_Thread::self (thr_handle);
ACE_NEW_RETURN (item,
DSRT_Dispatch_Item<DSRT_Scheduler_Traits> (id, qos),
-1);
item->thread_handle (thr_handle);
if (this->ready_queue_.insert (item) == -1)
return -1;
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):schedule_i after ready_q.insert\n"));
#endif
if (ACE_OS::thr_setprio (thr_handle,
this->blocked_prio_,
this->sched_policy_) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("%p\n"),
ACE_TEXT ("thr_setprio failed")), -1);
}
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):schedule_i after thr_setprio\n"));
#endif
//ready_queue_.dump ();
//@@ Perhaps the lock could be moved further down just before
//setting the condition variable?
ACE_GUARD_RETURN (cond_lock_t,
mon, this->sched_queue_modified_cond_lock_, 0);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):schedule_i after acquiring cond lock\n"));
#endif
this->sched_queue_modified_ = 1;
this->sched_queue_modified_cond_.signal ();
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG,
"(%t|%T):schedule_i exit\n"));
#endif
return 0;
}
template <class DSRT_Scheduler_Traits>
int DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::
update_schedule_i (Guid_t guid, const DSRT_QoSDescriptor& qos)
{
return this->schedule (guid, qos);
}
template <class DSRT_Scheduler_Traits>
int DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::
update_schedule_i (Guid_t guid, Block_Flag_t flag)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block entered\n"));
#endif
DSRT_Dispatch_Item_var<DSRT_Scheduler_Traits> dispatch_item;
ACE_hthread_t thr_handle;
//@@ Perhaps the lock could be got rid of. It looks like the state
//of this object is not getting modified here. It makes calls to
//other methods, which already are thread-safe.
//ACE_Guard<cond_lock_t> mon(sched_queue_modified_cond_lock_);
int found = this->ready_queue_.find (guid, dispatch_item);
if (found == 0 && flag == BLOCK)
{
thr_handle = dispatch_item->thread_handle ();
if (ACE_OS::thr_setprio (thr_handle,
this->blocked_prio_,
this->sched_policy_) == -1)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("%p\n"),
ACE_TEXT ("thr_setprio failed")));
}
//monitor released because cancel_schedule would acquire the
//lock. Using recursive mutex creates lock up.
//
//@@ Need to investigate this further. Also we can consider
//using the Thread-Safe interface pattern.
//mon.release ();
int rc = this->cancel_schedule (guid);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block done\n"));
#endif
return rc;
}
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block done\n"));
#endif
return -1;
}
template <class DSRT_Scheduler_Traits> int
DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::
cancel_schedule_i (Guid_t guid)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1);
#ifdef KOKYU_DSRT_LOGGING
ACE_DEBUG ((LM_DEBUG, "(%t): about to remove guid\n"));
#endif
this->ready_queue_.remove (guid);
#ifdef KOKYU_DSRT_LOGGING
this->ready_queue_.dump ();
#endif
if (this->curr_scheduled_guid_ == guid)
{
this->curr_scheduled_guid_ = 0;
this->curr_scheduled_thr_handle_ = 0;
}
ACE_GUARD_RETURN (cond_lock_t,
mon, this->sched_queue_modified_cond_lock_, 0);
this->sched_queue_modified_ = 1;
this->sched_queue_modified_cond_.signal ();
return 0;
}
template <class DSRT_Scheduler_Traits> int
DSRT_Direct_Dispatcher_Impl<DSRT_Scheduler_Traits>::
shutdown_i ()
{
this->shutdown_flagged_ = 1;
ACE_Guard<cond_lock_t> mon(this->sched_queue_modified_cond_lock_);
this->sched_queue_modified_ = 1;
this->sched_queue_modified_cond_.signal ();
// We have to wait until the scheduler executive thread shuts
// down. But we have acquired the lock and if we wait without
// releasing it, the scheduler thread will try to acquire it after
// it gets woken up by the above signal and it fails to acquire the
// lock. This will lead to a deadlock. So release the lock before we
// wait.
mon.release ();
this->wait ();
return 0;
}
}
#endif /* DSRT_DIRECT_DISPATCHER_IMPL_T_CPP */
|