1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
|
/* -----------------------------------------------------------------------------
*
* (c) The GHC Team 1998-2005
*
* Prototypes for functions in Schedule.c
* (RTS internal scheduler interface)
*
* -------------------------------------------------------------------------*/
#ifndef SCHEDULE_H
#define SCHEDULE_H
#include "OSThreads.h"
#include "Capability.h"
/* initScheduler(), exitScheduler()
* Called from STG : no
* Locks assumed : none
*/
void initScheduler (void);
void exitScheduler (rtsBool wait_foreign);
void freeScheduler (void);
// Place a new thread on the run queue of the current Capability
void scheduleThread (Capability *cap, StgTSO *tso);
// Place a new thread on the run queue of a specified Capability
// (cap is the currently owned Capability, cpu is the number of
// the desired Capability).
void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso);
/* awakenBlockedQueue()
*
* Takes a pointer to the beginning of a blocked TSO queue, and
* wakes up the entire queue.
* Called from STG : yes
* Locks assumed : none
*/
#if defined(GRAN)
void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
#elif defined(PAR)
void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
#else
void awakenBlockedQueue (Capability *cap, StgTSO *tso);
#endif
/* wakeUpRts()
*
* Causes an OS thread to wake up and run the scheduler, if necessary.
*/
void wakeUpRts(void);
/* unblockOne()
*
* Put the specified thread on the run queue of the given Capability.
* Called from STG : yes
* Locks assumed : we own the Capability.
*/
StgTSO * unblockOne (Capability *cap, StgTSO *tso);
/* raiseExceptionHelper */
StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
/* findRetryFrameHelper */
StgWord findRetryFrameHelper (StgTSO *tso);
/* workerStart()
*
* Entry point for a new worker task.
* Called from STG : NO
* Locks assumed : none
*/
void workerStart(Task *task);
#if defined(GRAN)
void awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node);
void unlink_from_bq(StgTSO* tso, StgClosure* node);
void initThread(StgTSO *tso, nat stack_size, StgInt pri);
#elif defined(PAR)
nat run_queue_len(void);
void awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node);
void initThread(StgTSO *tso, nat stack_size);
#else
char *info_type(StgClosure *closure); // dummy
char *info_type_by_ip(StgInfoTable *ip); // dummy
void awaken_blocked_queue(StgTSO *q);
void initThread(StgTSO *tso, nat stack_size);
#endif
/* Context switch flag.
* Locks required : none (conflicts are harmless)
*/
extern int RTS_VAR(context_switch);
/* The state of the scheduler. This is used to control the sequence
* of events during shutdown, and when the runtime is interrupted
* using ^C.
*/
#define SCHED_RUNNING 0 /* running as normal */
#define SCHED_INTERRUPTING 1 /* ^C detected, before threads are deleted */
#define SCHED_SHUTTING_DOWN 2 /* final shutdown */
extern rtsBool RTS_VAR(sched_state);
/*
* flag that tracks whether we have done any execution in this time slice.
*/
#define ACTIVITY_YES 0 /* there has been activity in the current slice */
#define ACTIVITY_MAYBE_NO 1 /* no activity in the current slice */
#define ACTIVITY_INACTIVE 2 /* a complete slice has passed with no activity */
#define ACTIVITY_DONE_GC 3 /* like 2, but we've done a GC too */
/* Recent activity flag.
* Locks required : Transition from MAYBE_NO to INACTIVE
* happens in the timer signal, so it is atomic. Trnasition from
* INACTIVE to DONE_GC happens under sched_mutex. No lock required
* to set it to ACTIVITY_YES.
*/
extern nat recent_activity;
/* Thread queues.
* Locks required : sched_mutex
*
* In GranSim we have one run/blocked_queue per PE.
*/
#if defined(GRAN)
// run_queue_hds defined in GranSim.h
#else
extern StgTSO *RTS_VAR(blackhole_queue);
#if !defined(THREADED_RTS)
extern StgTSO *RTS_VAR(blocked_queue_hd), *RTS_VAR(blocked_queue_tl);
extern StgTSO *RTS_VAR(sleeping_queue);
#endif
#endif
/* Linked list of all threads.
* Locks required : sched_mutex
*/
extern StgTSO *RTS_VAR(all_threads);
/* Set to rtsTrue if there are threads on the blackhole_queue, and
* it is possible that one or more of them may be available to run.
* This flag is set to rtsFalse after we've checked the queue, and
* set to rtsTrue just before we run some Haskell code. It is used
* to decide whether we should yield the Capability or not.
* Locks required : none (see scheduleCheckBlackHoles()).
*/
extern rtsBool blackholes_need_checking;
#if defined(THREADED_RTS)
extern Mutex RTS_VAR(sched_mutex);
#endif
SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
/* Called by shutdown_handler(). */
void interruptStgRts (void);
nat run_queue_len (void);
void resurrectThreads (StgTSO *);
void printAllThreads(void);
/* debugging only
*/
#ifdef DEBUG
void print_bq (StgClosure *node);
#endif
#if defined(PAR)
void print_bqe (StgBlockingQueueElement *bqe);
#endif
/* -----------------------------------------------------------------------------
* Some convenient macros/inline functions...
*/
#if !IN_STG_CODE
/* END_TSO_QUEUE and friends now defined in includes/StgMiscClosures.h */
/* Add a thread to the end of the run queue.
* NOTE: tso->link should be END_TSO_QUEUE before calling this macro.
* ASSUMES: cap->running_task is the current task.
*/
INLINE_HEADER void
appendToRunQueue (Capability *cap, StgTSO *tso)
{
ASSERT(tso->_link == END_TSO_QUEUE);
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_hd = tso;
} else {
setTSOLink(cap, cap->run_queue_tl, tso);
}
cap->run_queue_tl = tso;
}
/* Push a thread on the beginning of the run queue. Used for
* newly awakened threads, so they get run as soon as possible.
* ASSUMES: cap->running_task is the current task.
*/
INLINE_HEADER void
pushOnRunQueue (Capability *cap, StgTSO *tso)
{
setTSOLink(cap, tso, cap->run_queue_hd);
cap->run_queue_hd = tso;
if (cap->run_queue_tl == END_TSO_QUEUE) {
cap->run_queue_tl = tso;
}
}
/* Pop the first thread off the runnable queue.
*/
INLINE_HEADER StgTSO *
popRunQueue (Capability *cap)
{
StgTSO *t = cap->run_queue_hd;
ASSERT(t != END_TSO_QUEUE);
cap->run_queue_hd = t->_link;
t->_link = END_TSO_QUEUE; // no write barrier req'd
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_tl = END_TSO_QUEUE;
}
return t;
}
/* Add a thread to the end of the blocked queue.
*/
#if !defined(THREADED_RTS)
INLINE_HEADER void
appendToBlockedQueue(StgTSO *tso)
{
ASSERT(tso->_link == END_TSO_QUEUE);
if (blocked_queue_hd == END_TSO_QUEUE) {
blocked_queue_hd = tso;
} else {
setTSOLink(&MainCapability, blocked_queue_tl, tso);
}
blocked_queue_tl = tso;
}
#endif
#if defined(THREADED_RTS)
INLINE_HEADER void
appendToWakeupQueue (Capability *cap, StgTSO *tso)
{
ASSERT(tso->_link == END_TSO_QUEUE);
if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
cap->wakeup_queue_hd = tso;
} else {
setTSOLink(cap, cap->wakeup_queue_tl, tso);
}
cap->wakeup_queue_tl = tso;
}
#endif
/* Check whether various thread queues are empty
*/
INLINE_HEADER rtsBool
emptyQueue (StgTSO *q)
{
return (q == END_TSO_QUEUE);
}
INLINE_HEADER rtsBool
emptyRunQueue(Capability *cap)
{
return emptyQueue(cap->run_queue_hd);
}
#if defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptyWakeupQueue(Capability *cap)
{
return emptyQueue(cap->wakeup_queue_hd);
}
#endif
#if !defined(THREADED_RTS)
#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
#endif
INLINE_HEADER rtsBool
emptyThreadQueues(Capability *cap)
{
return emptyRunQueue(cap)
#if !defined(THREADED_RTS)
&& EMPTY_BLOCKED_QUEUE() && EMPTY_SLEEPING_QUEUE()
#endif
;
}
#endif /* !IN_STG_CODE */
#endif /* SCHEDULE_H */
|