1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
/* --------------------------------------------------------------------------
Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl>
Initialising the parallel RTS
An extension based on Kevin Hammond's GRAPH for PVM version
P. Trinder, January 17th 1995.
Adapted for the new RTS
P. Trinder, July 1997.
H-W. Loidl, November 1999.
------------------------------------------------------------------------ */
#ifdef PAR /* whole file */
//@menu
//* Includes::
//* Global variables::
//* Initialisation Routines::
//@end menu
//@node Includes, Global variables
//@subsection Includes
/* Evidently not Posix */
/* #include "PosixSource.h" */
#include <setjmp.h>
#include "Rts.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "ParallelRts.h"
#include "Sparks.h"
#include "LLC.h"
#include "HLC.h"
//@node Global variables, Initialisation Routines, Includes
//@subsection Global variables
/* Global conditions defined here. */
rtsBool IAmMainThread = rtsFalse; /* Set for the main thread */
/* Task identifiers for various interesting global tasks. */
GlobalTaskId IOTask = 0, /* The IO Task Id */
SysManTask = 0, /* The System Manager Task Id */
mytid = 0; /* This PE's Task Id */
rtsTime main_start_time; /* When the program started */
rtsTime main_stop_time; /* When the program finished */
jmp_buf exit_parallel_system; /* How to abort from the RTS */
//rtsBool fishing = rtsFalse; /* We have no fish out in the stream */
rtsTime last_fish_arrived_at = 0; /* Time of arrival of most recent fish*/
nat outstandingFishes = 0; /* Number of active fishes */
//@cindex spark queue
/* GranSim: a globally visible array of spark queues */
rtsSpark *pending_sparks_hd[SPARK_POOLS], /* ptr to start of a spark pool */
*pending_sparks_tl[SPARK_POOLS], /* ptr to end of a spark pool */
*pending_sparks_lim[SPARK_POOLS],
*pending_sparks_base[SPARK_POOLS];
//@cindex spark_limit
/* max number of sparks permitted on the PE;
see RtsFlags.ParFlags.maxLocalSparks */
nat spark_limit[SPARK_POOLS];
//@cindex PendingFetches
/* A list of fetch reply messages not yet processed; this list is filled
by awaken_blocked_queue and processed by processFetches */
StgBlockedFetch *PendingFetches = END_BF_QUEUE;
//@cindex allPEs
GlobalTaskId *allPEs;
//@cindex nPEs
nat nPEs = 0;
//@cindex sparksIgnored
nat sparksIgnored = 0, sparksCreated = 0,
threadsIgnored = 0, threadsCreated = 0;
//@cindex advisory_thread_count
nat advisory_thread_count = 0;
globalAddr theGlobalFromGA;
/* For flag handling see RtsFlags.h */
//@node Prototypes
//@subsection Prototypes
/* Needed for FISH messages (initialisation of random number generator) */
void srand48 (long);
time_t time (time_t *);
//@node Initialisation Routines, , Global variables
//@subsection Initialisation Routines
/*
par_exit defines how to terminate the program. If the exit code is
non-zero (i.e. an error has occurred), the PE should not halt until
outstanding error messages have been processed. Otherwise, messages
might be sent to non-existent Task Ids. The infinite loop will actually
terminate, since STG_Exception will call myexit\tr{(0)} when
it received a PP_FINISH from the system manager task.
*/
//@cindex shutdownParallelSystem
void
shutdownParallelSystem(StgInt n)
{
/* use the file specified via -S */
FILE *sf = RtsFlags.GcFlags.statsFile;
IF_PAR_DEBUG(verbose,
if (n==0)
belch("==== entered shutdownParallelSystem ...");
else
belch("==== entered shutdownParallelSystem (ERROR %d)...", n);
);
stopPEComms(n);
#if 0
if (sf!=(FILE*)NULL)
fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored",
(W_) mytid, sparksCreated, sparksIgnored,
threadsCreated, threadsIgnored);
#endif
ShutdownEachPEHook();
}
//@cindex initParallelSystem
void
initParallelSystem(void)
{
/* Don't buffer standard channels... */
setbuf(stdout,NULL);
setbuf(stderr,NULL);
srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/
/* used to select target of FISH message*/
if (!InitPackBuffer())
barf("InitPackBuffer");
if (!initMoreBuffers())
barf("initMoreBuffers");
if (!initSparkPools())
barf("initSparkPools");
}
/*
* SynchroniseSystem synchronises the reduction task with the system
* manager, and initialises the Global address tables (LAGA & GALA)
*/
//@cindex synchroniseSystem
void
synchroniseSystem(void)
{
/* Only in debug mode? */
fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs);
InitEachPEHook(); /* HWL: hook to be execed on each PE */
/* Initialize global address tables */
initGAtables();
initParallelSystem();
startPEComms();
}
/*
Do the startup stuff (this is PVM specific!).
Determines global vars: mytid, IAmMainThread, SysManTask, nPEs
Called at the beginning of RtsStartup.startupHaskell
*/
void
startupParallelSystem(char *argv[]) {
mytid = pvm_mytid(); /* Connect to PVM */
if (*argv[0] == '-') { /* Look to see whether we're the Main Thread */
IAmMainThread = rtsTrue;
sscanf(argv[0],"-%0X",&SysManTask); /* extract SysMan task ID*/
argv++; /* Strip off flag argument */
} else {
SysManTask = pvm_parent();
}
IF_PAR_DEBUG(verbose,
fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n",
mytid, IAmMainThread?"Main":"Remote", SysManTask));
nPEs = atoi(argv[1]);
}
/*
Exception handler during startup.
*/
void *
processUnexpectedMessageDuringStartup(rtsPacket p) {
OpCode opCode;
GlobalTaskId sender_id;
getOpcodeAndSender(p, &opCode, &sender_id);
switch(opCode) {
case PP_FISH:
bounceFish();
break;
#if defined(DIST)
case PP_REVAL:
bounceReval();
break;
#endif
case PP_FINISH:
stg_exit(EXIT_SUCCESS);
break;
default:
fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n",
mytid, opCode, getOpName(opCode), sender_id);
}
}
void
startPEComms(void){
startUpPE();
allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES,
"(PEs)");
/* Send our tid and IAmMainThread flag back to SysMan */
sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread);
/* Wait until we get the PE-Id table from Sysman */
waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup);
IF_PAR_DEBUG(verbose,
belch("==-- startPEComms: methinks we just received a PP_PETIDS message"));
/* Digest the PE table we received */
processPEtids();
}
void
processPEtids(void) {
long newPE;
nat i, sentPEs, currentPEs;
nPEs=0;
currentPEs = nPEs;
IF_PAR_DEBUG(verbose,
belch("==-- processPEtids: starting to iterate over a PVM buffer"));
/* ToDo: this has to go into LLComms !!! */
GetArgs(&sentPEs,1);
ASSERT(sentPEs > currentPEs);
ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/
for (i = 0; i < sentPEs; i++) {
GetArgs(&newPE,1);
if (i<currentPEs) {
ASSERT(newPE == allPEs[i]);
} else {
#if defined(DIST)
// breaks with PAR && !DEBUG
IF_PAR_DEBUG(verbose,
fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE));
if(!looks_like_tid(newPE))
barf("unacceptable taskID %x\n",newPE);
#endif
allPEs[i] = newPE;
nPEs++;
registerTask(newPE);
}
}
IF_PAR_DEBUG(verbose,
/* debugging */
belch("++++ [%x] PE table as I see it:", mytid);
for (i = 0; i < sentPEs; i++) {
belch("++++ allPEs[%d] = %x", i, allPEs[i]);
});
}
void
stopPEComms(StgInt n) {
if (n != 0) {
/* In case sysman doesn't know about us yet...
pvm_initsend(PvmDataDefault);
PutArgs(&IAmMainThread,1);
pvm_send(SysManTask, PP_READY);
*/
sendOp(PP_READY, SysManTask);
}
sendOp2(PP_FINISH, SysManTask, n, n);
waitForPEOp(PP_FINISH, SysManTask, NULL);
fflush(gr_file);
shutDownPE();
}
#endif /* PAR -- whole file */
//@index
//* PendingFetches:: @cindex\s-+PendingFetches
//* SynchroniseSystem:: @cindex\s-+SynchroniseSystem
//* allPEs:: @cindex\s-+allPEs
//* initParallelSystem:: @cindex\s-+initParallelSystem
//* nPEs:: @cindex\s-+nPEs
//* par_exit:: @cindex\s-+par_exit
//* spark queue:: @cindex\s-+spark queue
//* sparksIgnored:: @cindex\s-+sparksIgnored
//@end index
|