summaryrefslogtreecommitdiff
path: root/rts/parallel/HLComms.c
diff options
context:
space:
mode:
Diffstat (limited to 'rts/parallel/HLComms.c')
-rw-r--r--rts/parallel/HLComms.c1810
1 files changed, 1810 insertions, 0 deletions
diff --git a/rts/parallel/HLComms.c b/rts/parallel/HLComms.c
new file mode 100644
index 0000000000..b0982e441c
--- /dev/null
+++ b/rts/parallel/HLComms.c
@@ -0,0 +1,1810 @@
+/* ----------------------------------------------------------------------------
+ * Time-stamp: <Wed Mar 21 2001 16:34:41 Stardate: [-30]6363.45 hwloidl>
+ *
+ * High Level Communications Routines (HLComms.lc)
+ *
+ * Contains the high-level routines (i.e. communication
+ * subsystem independent) used by GUM
+ *
+ * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
+ * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
+ * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
+ *
+ * ------------------------------------------------------------------------- */
+
+#ifdef PAR /* whole file */
+
+//@node High Level Communications Routines, , ,
+//@section High Level Communications Routines
+
+//@menu
+//* Macros etc::
+//* Includes::
+//* GUM Message Sending and Unpacking Functions::
+//* Message-Processing Functions::
+//* GUM Message Processor::
+//* Miscellaneous Functions::
+//* Index::
+//@end menu
+
+//@node Macros etc, Includes, High Level Communications Routines, High Level Communications Routines
+//@subsection Macros etc
+
+/* Evidently not Posix */
+/* #include "PosixSource.h" */
+
+//@node Includes, GUM Message Sending and Unpacking Functions, Macros etc, High Level Communications Routines
+//@subsection Includes
+
+#include "Rts.h"
+#include "RtsUtils.h"
+#include "RtsFlags.h"
+#include "Storage.h" // for recordMutable
+#include "HLC.h"
+#include "Parallel.h"
+#include "GranSimRts.h"
+#include "ParallelRts.h"
+#include "Sparks.h"
+#include "FetchMe.h" // for BLOCKED_FETCH_info etc
+#if defined(DEBUG)
+# include "ParallelDebug.h"
+#endif
+#include "StgMacros.h" // inlined IS_... fcts
+
+#ifdef DIST
+#include "SchedAPI.h" //for createIOThread
+extern unsigned int context_switch;
+#endif /* DIST */
+
+//@node GUM Message Sending and Unpacking Functions, Message-Processing Functions, Includes, High Level Communications Routines
+//@subsection GUM Message Sending and Unpacking Functions
+
+/*
+ * GUM Message Sending and Unpacking Functions
+ */
+
+/*
+ * Allocate space for message processing
+ */
+
+//@cindex gumPackBuffer
+static rtsPackBuffer *gumPackBuffer;
+
+//@cindex initMoreBuffers
+rtsBool
+initMoreBuffers(void)
+{
+ if ((gumPackBuffer = (rtsPackBuffer *)stgMallocWords(RtsFlags.ParFlags.packBufferSize,
+ "initMoreBuffers")) == NULL)
+ return rtsFalse;
+ return rtsTrue;
+}
+
+/*
+ * SendFetch packs the two global addresses and a load into a message +
+ * sends it.
+
+//@cindex FETCH
+
+ Structure of a FETCH message:
+
+ | GA 1 | GA 2 |
+ +------------------------------------+------+
+ | gtid | slot | weight | gtid | slot | load |
+ +------------------------------------+------+
+ */
+
+//@cindex sendFetch
+void
+sendFetch(globalAddr *rga, globalAddr *lga, int load)
+{
+ ASSERT(rga->weight > 0 && lga->weight > 0);
+ IF_PAR_DEBUG(fetch,
+ belch("~^** Sending Fetch for ((%x, %d, 0)); locally ((%x, %d, %x)), load = %d",
+ rga->payload.gc.gtid, rga->payload.gc.slot,
+ lga->payload.gc.gtid, lga->payload.gc.slot, lga->weight,
+ load));
+
+
+ /* ToDo: Dump event
+ DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(rga->payload.gc.gtid),
+ GR_FETCH, CurrentTSO, (StgClosure *)(lga->payload.gc.slot),
+ 0, spark_queue_len(ADVISORY_POOL));
+ */
+
+ sendOpV(PP_FETCH, rga->payload.gc.gtid, 6,
+ (StgWord) rga->payload.gc.gtid, (StgWord) rga->payload.gc.slot,
+ (StgWord) lga->weight, (StgWord) lga->payload.gc.gtid,
+ (StgWord) lga->payload.gc.slot, (StgWord) load);
+}
+
+/*
+ * unpackFetch unpacks a FETCH message into two Global addresses and a load
+ * figure.
+*/
+
+//@cindex unpackFetch
+static void
+unpackFetch(globalAddr *lga, globalAddr *rga, int *load)
+{
+ long buf[6];
+
+ GetArgs(buf, 6);
+
+ IF_PAR_DEBUG(fetch,
+ belch("~^** Unpacking Fetch for ((%x, %d, 0)) to ((%x, %d, %x)), load = %d",
+ (GlobalTaskId) buf[0], (int) buf[1],
+ (GlobalTaskId) buf[3], (int) buf[4], buf[2], buf[5]));
+
+ lga->weight = 1;
+ lga->payload.gc.gtid = (GlobalTaskId) buf[0];
+ lga->payload.gc.slot = (int) buf[1];
+
+ rga->weight = (unsigned) buf[2];
+ rga->payload.gc.gtid = (GlobalTaskId) buf[3];
+ rga->payload.gc.slot = (int) buf[4];
+
+ *load = (int) buf[5];
+
+ ASSERT(rga->weight > 0);
+}
+
+/*
+ * SendResume packs the remote blocking queue's GA and data into a message
+ * and sends it.
+
+//@cindex RESUME
+
+ Structure of a RESUME message:
+
+ -------------------------------
+ | weight | slot | n | data ...
+ -------------------------------
+
+ data is a packed graph represented as an rtsPackBuffer
+ n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
+ */
+
+//@cindex sendResume
+void
+sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
+{
+ IF_PAR_DEBUG(fetch,
+ belch("~^[] Sending Resume (packet <<%d>> with %d elems) for ((%x, %d, %x)) to [%x]",
+ packBuffer->id, nelem,
+ rga->payload.gc.gtid, rga->payload.gc.slot, rga->weight,
+ rga->payload.gc.gtid));
+ IF_PAR_DEBUG(packet,
+ PrintPacket(packBuffer));
+
+ ASSERT(nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
+
+ sendOpNV(PP_RESUME, rga->payload.gc.gtid,
+ nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer,
+ 2, (rtsWeight) rga->weight, (StgWord) rga->payload.gc.slot);
+}
+
+/*
+ * unpackResume unpacks a Resume message into two Global addresses and
+ * a data array.
+ */
+
+//@cindex unpackResume
+static void
+unpackResume(globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
+{
+ long buf[3];
+
+ GetArgs(buf, 3);
+
+ /*
+ RESUME event is written in awaken_blocked_queue
+ DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(lga->payload.gc.gtid),
+ GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
+ */
+
+ lga->weight = (unsigned) buf[0];
+ lga->payload.gc.gtid = mytid;
+ lga->payload.gc.slot = (int) buf[1];
+
+ *nelem = (int) buf[2] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
+ GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
+
+ IF_PAR_DEBUG(fetch,
+ belch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for ((%x, %d, %x))",
+ packBuffer->id, *nelem, mytid, (int) buf[1], (unsigned) buf[0]));
+
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
+}
+
+/*
+ * SendAck packs the global address being acknowledged, together with
+ * an array of global addresses for any closures shipped and sends them.
+
+//@cindex ACK
+
+ Structure of an ACK message:
+
+ | GA 1 | GA 2 |
+ +---------------------------------------------+-------
+ | weight | gtid | slot | weight | gtid | slot | ..... ngas times
+ + --------------------------------------------+-------
+
+ */
+
+//@cindex sendAck
+void
+sendAck(GlobalTaskId task, int ngas, globalAddr *gagamap)
+{
+ static long *buffer;
+ long *p;
+ int i;
+
+ if(ngas==0)
+ return; //don't send unnecessary messages!!
+
+ buffer = (long *) gumPackBuffer;
+
+ for(i = 0, p = buffer; i < ngas; i++, p += 6) {
+ ASSERT(gagamap[1].weight > 0);
+ p[0] = (long) gagamap->weight;
+ p[1] = (long) gagamap->payload.gc.gtid;
+ p[2] = (long) gagamap->payload.gc.slot;
+ gagamap++;
+ p[3] = (long) gagamap->weight;
+ p[4] = (long) gagamap->payload.gc.gtid;
+ p[5] = (long) gagamap->payload.gc.slot;
+ gagamap++;
+ }
+ IF_PAR_DEBUG(schedule,
+ belch("~^,, Sending Ack (%d pairs) to [%x]\n",
+ ngas, task));
+
+ sendOpN(PP_ACK, task, p - buffer, (StgPtr)buffer);
+}
+
+/*
+ * unpackAck unpacks an Acknowledgement message into a Global address,
+ * a count of the number of global addresses following and a map of
+ * Global addresses
+ */
+
+//@cindex unpackAck
+static void
+unpackAck(int *ngas, globalAddr *gagamap)
+{
+ long GAarraysize;
+ long buf[6];
+
+ GetArgs(&GAarraysize, 1);
+
+ *ngas = GAarraysize / 6;
+
+ IF_PAR_DEBUG(schedule,
+ belch("~^,, Unpacking Ack (%d pairs) on [%x]\n",
+ *ngas, mytid));
+
+ while (GAarraysize > 0) {
+ GetArgs(buf, 6);
+ gagamap->weight = (rtsWeight) buf[0];
+ gagamap->payload.gc.gtid = (GlobalTaskId) buf[1];
+ gagamap->payload.gc.slot = (int) buf[2];
+ gagamap++;
+ gagamap->weight = (rtsWeight) buf[3];
+ gagamap->payload.gc.gtid = (GlobalTaskId) buf[4];
+ gagamap->payload.gc.slot = (int) buf[5];
+ ASSERT(gagamap->weight > 0);
+ gagamap++;
+ GAarraysize -= 6;
+ }
+}
+
+/*
+ * SendFish packs the global address being acknowledged, together with
+ * an array of global addresses for any closures shipped and sends them.
+
+//@cindex FISH
+
+ Structure of a FISH message:
+
+ +----------------------------------+
+ | orig PE | age | history | hunger |
+ +----------------------------------+
+ */
+
+//@cindex sendFish
+void
+sendFish(GlobalTaskId destPE, GlobalTaskId origPE,
+ int age, int history, int hunger)
+{
+ IF_PAR_DEBUG(fish,
+ belch("~^$$ Sending Fish to [%x] (%d outstanding fishes)",
+ destPE, outstandingFishes));
+
+ sendOpV(PP_FISH, destPE, 4,
+ (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);
+
+ if (origPE == mytid) {
+ //fishing = rtsTrue;
+ outstandingFishes++;
+ }
+}
+
+/*
+ * unpackFish unpacks a FISH message into the global task id of the
+ * originating PE and 3 data fields: the age, history and hunger of the
+ * fish. The history + hunger are not currently used.
+
+ */
+
+//@cindex unpackFish
+static void
+unpackFish(GlobalTaskId *origPE, int *age, int *history, int *hunger)
+{
+ long buf[4];
+
+ GetArgs(buf, 4);
+
+ IF_PAR_DEBUG(fish,
+ belch("~^$$ Unpacking Fish from [%x] (age=%d)",
+ (GlobalTaskId) buf[0], (int) buf[1]));
+
+ *origPE = (GlobalTaskId) buf[0];
+ *age = (int) buf[1];
+ *history = (int) buf[2];
+ *hunger = (int) buf[3];
+}
+
+/*
+ * SendFree sends (weight, slot) pairs for GAs that we no longer need
+ * references to.
+
+//@cindex FREE
+
+ Structure of a FREE message:
+
+ +-----------------------------
+ | n | weight_1 | slot_1 | ...
+ +-----------------------------
+ */
+//@cindex sendFree
+void
+sendFree(GlobalTaskId pe, int nelem, StgPtr data)
+{
+ IF_PAR_DEBUG(free,
+ belch("~^!! Sending Free (%d GAs) to [%x]",
+ nelem/2, pe));
+
+ sendOpN(PP_FREE, pe, nelem, data);
+}
+
+/*
+ * unpackFree unpacks a FREE message into the amount of data shipped and
+ * a data block.
+ */
+//@cindex unpackFree
+static void
+unpackFree(int *nelem, StgWord *data)
+{
+ long buf[1];
+
+ GetArgs(buf, 1);
+ *nelem = (int) buf[0];
+
+ IF_PAR_DEBUG(free,
+ belch("~^!! Unpacking Free (%d GAs)",
+ *nelem/2));
+
+ GetArgs(data, *nelem);
+}
+
+/*
+ * SendSchedule sends a closure to be evaluated in response to a Fish
+ * message. The message is directed to the PE that originated the Fish
+ * (origPE), and includes the packed closure (data) along with its size
+ * (nelem).
+
+//@cindex SCHEDULE
+
+ Structure of a SCHEDULE message:
+
+ +------------------------------------
+ | PE | n | pack buffer of a graph ...
+ +------------------------------------
+ */
+//@cindex sendSchedule
+void
+sendSchedule(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
+{
+ IF_PAR_DEBUG(schedule,
+ belch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%x]\n",
+ packBuffer->id, nelem, origPE));
+ IF_PAR_DEBUG(packet,
+ PrintPacket(packBuffer));
+
+ ASSERT(nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
+
+ sendOpN(PP_SCHEDULE, origPE,
+ nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
+}
+
+/*
+ * unpackSchedule unpacks a SCHEDULE message into the Global address of
+ * the closure shipped, the amount of data shipped (nelem) and the data
+ * block (data).
+ */
+
+//@cindex unpackSchedule
+static void
+unpackSchedule(int *nelem, rtsPackBuffer *packBuffer)
+{
+ long buf[1];
+
+ /* first, just unpack 1 word containing the total size (including header) */
+ GetArgs(buf, 1);
+ /* no. of elems, not counting the header of the pack buffer */
+ *nelem = (int) buf[0] - PACK_BUFFER_HDR_SIZE - DEBUG_HEADROOM;
+
+ /* automatic cast of flat pvm-data to rtsPackBuffer */
+ GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM);
+
+ IF_PAR_DEBUG(schedule,
+ belch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%x]\n",
+ packBuffer->id, *nelem, mytid));
+
+ ASSERT(*nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
+}
+
+#ifdef DIST
+/* sendReval is almost identical to the Schedule version, so we can unpack with unpackSchedule */
+void
+sendReval(GlobalTaskId origPE, int nelem, rtsPackBuffer *packBuffer)
+{
+ IF_PAR_DEBUG(schedule,
+ belch("~^-- Sending Reval (packet <<%d>> with %d elems) to [%x]\n",
+ packBuffer->id, nelem, origPE));
+ IF_PAR_DEBUG(packet,
+ PrintPacket(packBuffer));
+
+ ASSERT(nelem==packBuffer->size);
+ /* check for magic end-of-buffer word */
+ IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
+
+ sendOpN(PP_REVAL, origPE,
+ nelem + PACK_BUFFER_HDR_SIZE + DEBUG_HEADROOM, (StgPtr)packBuffer);
+}
+
+void FinishReval(StgTSO *t)
+{ StgClosure *res;
+ globalAddr ga;
+ nat size;
+ rtsPackBuffer *buffer=NULL;
+
+ ga.payload.gc.slot = t->revalSlot;
+ ga.payload.gc.gtid = t->revalTid;
+ ga.weight = 0;
+
+ //find where the reval result is
+ res = GALAlookup(&ga);
+ ASSERT(res);
+
+ IF_PAR_DEBUG(schedule,
+ printGA(&ga);
+ belch(" needs the result %08x\n",res));
+
+ //send off the result
+ buffer = PackNearbyGraph(res, END_TSO_QUEUE, &size,ga.payload.gc.gtid);
+ ASSERT(buffer != (rtsPackBuffer *)NULL);
+ sendResume(&ga, size, buffer);
+
+ IF_PAR_DEBUG(schedule,
+ belch("@;~) Reval Finished"));
+}
+
+#endif /* DIST */
+
+//@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
+//@subsection Message-Processing Functions
+
+/*
+ * Message-Processing Functions
+ *
+ * The following routines process incoming GUM messages. Often reissuing
+ * messages in response.
+ *
+ * processFish unpacks a fish message, reissuing it if it's our own,
+ * sending work if we have it or sending it onwards otherwise.
+ */
+
+/*
+ * processFetches constructs and sends resume messages for every
+ * BlockedFetch which is ready to be awakened.
+ * awaken_blocked_queue (in Schedule.c) is responsible for moving
+ * BlockedFetches from a blocking queue to the PendingFetches queue.
+ */
+void GetRoots(void);
+extern StgBlockedFetch *PendingFetches;
+
+nat
+pending_fetches_len(void)
+{
+ StgBlockedFetch *bf;
+ nat n;
+
+ for (n=0, bf=PendingFetches; bf != END_BF_QUEUE; n++, bf = (StgBlockedFetch *)(bf->link)) {
+ ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
+ }
+ return n;
+}
+
+//@cindex processFetches
+void
+processFetches(void) {
+ StgBlockedFetch *bf, *next;
+ StgClosure *closure;
+ StgInfoTable *ip;
+ globalAddr rga;
+ static rtsPackBuffer *packBuffer;
+
+ IF_PAR_DEBUG(verbose,
+ belch("____ processFetches: %d pending fetches (root @ %p)",
+ pending_fetches_len(), PendingFetches));
+
+ for (bf = PendingFetches;
+ bf != END_BF_QUEUE;
+ bf=next) {
+ /* the PendingFetches list contains only BLOCKED_FETCH closures */
+ ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
+ /* store link (we might overwrite it via blockFetch later on */
+ next = (StgBlockedFetch *)(bf->link);
+
+ /*
+ * Find the target at the end of the indirection chain, and
+ * process it in much the same fashion as the original target
+ * of the fetch. Though we hope to find graph here, we could
+ * find a black hole (of any flavor) or even a FetchMe.
+ */
+ closure = bf->node;
+ /*
+ We evacuate BQs and update the node fields where necessary in GC.c
+ So, if we find an EVACUATED closure, something has gone Very Wrong
+ (and therefore we let the RTS crash most ungracefully).
+ */
+ ASSERT(get_itbl(closure)->type != EVACUATED);
+ // closure = ((StgEvacuated *)closure)->evacuee;
+
+ closure = UNWIND_IND(closure);
+ //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }
+
+ ip = get_itbl(closure);
+ if (ip->type == FETCH_ME) {
+ /* Forward the Fetch to someone else */
+ rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
+ rga.payload.gc.slot = bf->ga.payload.gc.slot;
+ rga.weight = bf->ga.weight;
+
+ sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fetch_mess++;
+ }
+
+ IF_PAR_DEBUG(fetch,
+ belch("__-> processFetches: Forwarding fetch from %lx to %lx",
+ mytid, rga.payload.gc.gtid));
+
+ } else if (IS_BLACK_HOLE(closure)) {
+ IF_PAR_DEBUG(verbose,
+ belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)",
+ closure, info_type(closure)));
+ bf->node = closure;
+ blockFetch(bf, closure);
+ } else {
+ /* We now have some local graph to send back */
+ nat size;
+
+ packBuffer = gumPackBuffer;
+ IF_PAR_DEBUG(verbose,
+ belch("__*> processFetches: PackNearbyGraph of closure %p (%s)",
+ closure, info_type(closure)));
+
+ if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid)) == NULL) {
+ // Put current BF back on list
+ bf->link = (StgBlockingQueueElement *)PendingFetches;
+ PendingFetches = (StgBlockedFetch *)bf;
+ // ToDo: check that nothing more has to be done to prepare for GC!
+ barf("processFetches: out of heap while packing graph; ToDo: call GC here");
+ GarbageCollect(GetRoots, rtsFalse);
+ bf = PendingFetches;
+ PendingFetches = (StgBlockedFetch *)(bf->link);
+ closure = bf->node;
+ packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.payload.gc.gtid);
+ ASSERT(packBuffer != (rtsPackBuffer *)NULL);
+ }
+ rga.payload.gc.gtid = bf->ga.payload.gc.gtid;
+ rga.payload.gc.slot = bf->ga.payload.gc.slot;
+ rga.weight = bf->ga.weight;
+
+ sendResume(&rga, size, packBuffer);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_resume_mess++;
+ }
+ }
+ }
+ PendingFetches = END_BF_QUEUE;
+}
+
+#if 0
+/*
+ Alternatively to sending fetch messages directly from the FETCH_ME_entry
+ code we could just store the data about the remote data in a global
+ variable and send the fetch request from the main scheduling loop (similar
+ to processFetches above). This would save an expensive STGCALL in the entry
+ code because we have to go back to the scheduler anyway.
+*/
+//@cindex processFetches
+void
+processTheRealFetches(void) {
+ StgBlockedFetch *bf;
+ StgClosure *closure, *next;
+
+ IF_PAR_DEBUG(verbose,
+ belch("__ processTheRealFetches: ");
+ printGA(&theGlobalFromGA);
+ printGA(&theGlobalToGA));
+
+ ASSERT(theGlobalFromGA.payload.gc.gtid != 0 &&
+ theGlobalToGA.payload.gc.gtid != 0);
+
+ /* the old version did this in the FETCH_ME entry code */
+ sendFetch(&theGlobalFromGA, &theGlobalToGA, 0/*load*/);
+
+}
+#endif
+
+
+/*
+ Way of dealing with unwanted fish.
+ Used during startup/shutdown, or from unknown PEs
+*/
+void
+bounceFish(void) {
+ GlobalTaskId origPE;
+ int age, history, hunger;
+
+ /* IF_PAR_DEBUG(verbose, */
+ belch(".... [%x] Bouncing unwanted FISH",mytid);
+
+ unpackFish(&origPE, &age, &history, &hunger);
+
+ if (origPE == mytid) {
+ //fishing = rtsFalse; // fish has come home
+ outstandingFishes--;
+ last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
+ return; // that's all
+ }
+
+ /* otherwise, send it home to die */
+ sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fish_mess++;
+ }
+}
+
+/*
+ * processFish unpacks a fish message, reissuing it if it's our own,
+ * sending work if we have it or sending it onwards otherwise.
+ */
+//@cindex processFish
+static void
+processFish(void)
+{
+ GlobalTaskId origPE;
+ int age, history, hunger;
+ rtsSpark spark;
+ static rtsPackBuffer *packBuffer;
+
+ unpackFish(&origPE, &age, &history, &hunger);
+
+ if (origPE == mytid) {
+ //fishing = rtsFalse; // fish has come home
+ outstandingFishes--;
+ last_fish_arrived_at = CURRENT_TIME; // remember time (see schedule fct)
+ return; // that's all
+ }
+
+ ASSERT(origPE != mytid);
+ IF_PAR_DEBUG(fish,
+ belch("$$__ processing fish; %d sparks available",
+ spark_queue_len(&(MainRegTable.rSparks))));
+ while ((spark = findSpark(rtsTrue/*for_export*/)) != NULL) {
+ nat size;
+ // StgClosure *graph;
+
+ packBuffer = gumPackBuffer;
+ ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
+ if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &size,origPE)) == NULL) {
+ IF_PAR_DEBUG(fish,
+ belch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p",
+ (StgClosure *)spark));
+ barf("processFish: out of heap while packing graph; ToDo: call GC here");
+ GarbageCollect(GetRoots, rtsFalse);
+ /* Now go back and try again */
+ } else {
+ IF_PAR_DEBUG(verbose,
+ if (RtsFlags.ParFlags.ParStats.Sparks)
+ belch("==== STEALING spark %x; sending to %x", spark, origPE));
+
+ IF_PAR_DEBUG(fish,
+ belch("$$-- Replying to FISH from %x by sending graph @ %p (%s)",
+ origPE,
+ (StgClosure *)spark, info_type((StgClosure *)spark)));
+ sendSchedule(origPE, size, packBuffer);
+ disposeSpark(spark);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_schedule_mess++;
+ }
+
+ break;
+ }
+ }
+ if (spark == (rtsSpark)NULL) {
+ IF_PAR_DEBUG(fish,
+ belch("$$^^ No sparks available for FISH from %x",
+ origPE));
+ /* We have no sparks to give */
+ if (age < FISH_LIFE_EXPECTANCY) {
+ /* and the fish is atill young, send it to another PE to look for work */
+ sendFish(choosePE(), origPE,
+ (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fish_mess++;
+ }
+ } else { /* otherwise, send it home to die */
+ sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, NEW_FISH_HUNGER);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fish_mess++;
+ }
+ }
+ }
+} /* processFish */
+
+/*
+ * processFetch either returns the requested data (if available)
+ * or blocks the remote blocking queue on a black hole (if not).
+ */
+
+//@cindex processFetch
+static void
+processFetch(void)
+{
+ globalAddr ga, rga;
+ int load;
+ StgClosure *closure;
+ StgInfoTable *ip;
+
+ unpackFetch(&ga, &rga, &load);
+ IF_PAR_DEBUG(fetch,
+ belch("%%%%__ Rcvd Fetch for ((%x, %d, 0)), Resume ((%x, %d, %x)) (load %d) from %x",
+ ga.payload.gc.gtid, ga.payload.gc.slot,
+ rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight, load,
+ rga.payload.gc.gtid));
+
+ closure = GALAlookup(&ga);
+ ASSERT(closure != (StgClosure *)NULL);
+ ip = get_itbl(closure);
+ if (ip->type == FETCH_ME) {
+ /* Forward the Fetch to someone else */
+ sendFetch(((StgFetchMe *)closure)->ga, &rga, load);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fetch_mess++;
+ }
+ } else if (rga.payload.gc.gtid == mytid) {
+ /* Our own FETCH forwarded back around to us */
+ StgFetchMeBlockingQueue *fmbq = (StgFetchMeBlockingQueue *)GALAlookup(&rga);
+
+ IF_PAR_DEBUG(fetch,
+ belch("%%%%== Fetch returned to sending PE; closure=%p (%s); receiver=%p (%s)",
+ closure, info_type(closure), fmbq, info_type((StgClosure*)fmbq)));
+ /* We may have already discovered that the fetch target is our own. */
+ if ((StgClosure *)fmbq != closure)
+ CommonUp((StgClosure *)fmbq, closure);
+ (void) addWeight(&rga);
+ } else if (IS_BLACK_HOLE(closure)) {
+ /* This includes RBH's and FMBQ's */
+ StgBlockedFetch *bf;
+
+ /* Can we assert something on the remote GA? */
+ ASSERT(GALAlookup(&rga) == NULL);
+
+ /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
+ closure into the BQ in order to denote that when updating this node
+ the result should be sent to the originator of this fetch message. */
+ bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
+ IF_PAR_DEBUG(fetch,
+ belch("%%++ Blocking Fetch ((%x, %d, %x)) on %p (%s)",
+ rga.payload.gc.gtid, rga.payload.gc.slot, rga.weight,
+ closure, info_type(closure)));
+ blockFetch(bf, closure);
+ } else {
+ /* The target of the FetchMe is some local graph */
+ nat size;
+ // StgClosure *graph;
+ rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
+
+ if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid)) == NULL) {
+ barf("processFetch: out of heap while packing graph; ToDo: call GC here");
+ GarbageCollect(GetRoots, rtsFalse);
+ closure = GALAlookup(&ga);
+ buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.payload.gc.gtid);
+ ASSERT(buffer != (rtsPackBuffer *)NULL);
+ }
+ sendResume(&rga, size, buffer);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_resume_mess++;
+ }
+ }
+}
+
+/*
+ The list of pending fetches must be a root-list for GC.
+ This routine is called from GC.c (same as marking GAs etc).
+*/
+void
+markPendingFetches(rtsBool major_gc) {
+
+ /* No need to traverse the list; this is done via the scavenge code
+ for a BLOCKED_FETCH closure, which evacuates the link field */
+
+ if (PendingFetches != END_BF_QUEUE ) {
+ IF_PAR_DEBUG(tables,
+ fprintf(stderr, "@@@@ PendingFetches is root; evaced from %p to",
+ PendingFetches));
+
+ PendingFetches = MarkRoot((StgClosure*)PendingFetches);
+
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr, " %p\n", PendingFetches));
+
+ } else {
+ IF_PAR_DEBUG(tables,
+ fprintf(stderr, "@@@@ PendingFetches is empty; no need to mark it\n"));
+ }
+}
+
+/*
+ * processFree unpacks a FREE message and adds the weights to our GAs.
+ */
+//@cindex processFree
+static void
+processFree(void)
+{
+ int nelem;
+ static StgWord *buffer;
+ int i;
+ globalAddr ga;
+
+ buffer = (StgWord *)gumPackBuffer;
+ unpackFree(&nelem, buffer);
+ IF_PAR_DEBUG(free,
+ belch("!!__ Rcvd Free (%d GAs)", nelem / 2));
+
+ ga.payload.gc.gtid = mytid;
+ for (i = 0; i < nelem;) {
+ ga.weight = (rtsWeight) buffer[i++];
+ ga.payload.gc.slot = (int) buffer[i++];
+ IF_PAR_DEBUG(free,
+ fprintf(stderr, "!!-- Processing free ");
+ printGA(&ga);
+ fputc('\n', stderr);
+ );
+ (void) addWeight(&ga);
+ }
+}
+
+/*
+ * processResume unpacks a RESUME message into the graph, filling in
+ * the LA -> GA, and GA -> LA tables. Threads blocked on the original
+ * FetchMe (now a blocking queue) are awakened, and the blocking queue
+ * is converted into an indirection. Finally it sends an ACK in response
+ * which contains any newly allocated GAs.
+ */
+
+//@cindex processResume
+static void
+processResume(GlobalTaskId sender)
+{
+ int nelem;
+ nat nGAs;
+ static rtsPackBuffer *packBuffer;
+ StgClosure *newGraph, *old;
+ globalAddr lga;
+ globalAddr *gagamap;
+
+ packBuffer = (rtsPackBuffer *)gumPackBuffer;
+ unpackResume(&lga, &nelem, packBuffer);
+
+ IF_PAR_DEBUG(fetch,
+ fprintf(stderr, "[]__ Rcvd Resume for ");
+ printGA(&lga);
+ fputc('\n', stderr));
+ IF_PAR_DEBUG(packet,
+ PrintPacket((rtsPackBuffer *)packBuffer));
+
+ /*
+ * We always unpack the incoming graph, even if we've received the
+ * requested node in some other data packet (and already awakened
+ * the blocking queue).
+ if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
+ ReallyPerformThreadGC(packBuffer[0], rtsFalse);
+ SAVE_Hp -= packBuffer[0];
+ }
+ */
+
+ // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+
+ /* Do this *after* GC; we don't want to release the object early! */
+
+ if (lga.weight > 0)
+ (void) addWeight(&lga);
+
+ old = GALAlookup(&lga);
+
+ /* ToDo: The closure that requested this graph must be one of these two?*/
+ ASSERT(get_itbl(old)->type == FETCH_ME_BQ ||
+ get_itbl(old)->type == RBH);
+
+ if (RtsFlags.ParFlags.ParStats.Full) {
+ StgBlockingQueueElement *bqe, *last_bqe;
+
+ IF_PAR_DEBUG(fetch,
+ belch("[]-- Resume is REPLY to closure %lx", old));
+
+ /* Write REPLY events to the log file, indicating that the remote
+ data has arrived
+ NB: we emit a REPLY only for the *last* elem in the queue; this is
+ the one that triggered the fetch message; all other entries
+ have just added themselves to the queue, waiting for the data
+ they know that has been requested (see entry code for FETCH_ME_BQ)
+ */
+ if ((get_itbl(old)->type == FETCH_ME_BQ ||
+ get_itbl(old)->type == RBH)) {
+ for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
+ last_bqe = END_BQ_QUEUE;
+ get_itbl(bqe)->type==TSO ||
+ get_itbl(bqe)->type==BLOCKED_FETCH;
+ last_bqe = bqe, bqe = bqe->link) { /* nothing */ }
+
+ ASSERT(last_bqe==END_BQ_QUEUE ||
+ get_itbl((StgClosure *)last_bqe)->type == TSO);
+
+ /* last_bqe now points to the TSO that triggered the FETCH */
+ if (get_itbl((StgClosure *)last_bqe)->type == TSO)
+ DumpRawGranEvent(CURRENT_PROC, taskIDtoPE(sender),
+ GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
+ 0, spark_queue_len(&(MainRegTable.rSparks)));
+ }
+ }
+
+ newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
+ ASSERT(newGraph != NULL);
+
+ /*
+ * Sometimes, unpacking will common up the resumee with the
+ * incoming graph, but if it hasn't, we'd better do so now.
+ */
+
+ if (get_itbl(old)->type == FETCH_ME_BQ)
+ CommonUp(old, newGraph);
+
+ IF_PAR_DEBUG(fetch,
+ belch("[]-- Ready to resume unpacked graph at %p (%s)",
+ newGraph, info_type(newGraph)));
+
+ IF_PAR_DEBUG(tables,
+ DebugPrintGAGAMap(gagamap, nGAs));
+
+ sendAck(sender, nGAs, gagamap);
+}
+
+/*
+ * processSchedule unpacks a SCHEDULE message into the graph, filling
+ * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
+ * the local spark queue. Finally it sends an ACK in response
+ * which contains any newly allocated GAs.
+ */
+//@cindex processSchedule
+static void
+processSchedule(GlobalTaskId sender)
+{
+ nat nelem, nGAs;
+ rtsBool success;
+ static rtsPackBuffer *packBuffer;
+ StgClosure *newGraph;
+ globalAddr *gagamap;
+
+ packBuffer = gumPackBuffer; /* HWL */
+ unpackSchedule(&nelem, packBuffer);
+
+ IF_PAR_DEBUG(schedule,
+ belch("--__ Rcvd Schedule (%d elems)", nelem));
+ IF_PAR_DEBUG(packet,
+ PrintPacket(packBuffer));
+
+ /*
+ * For now, the graph is a closure to be sparked as an advisory
+ * spark, but in future it may be a complete spark with
+ * required/advisory status, priority etc.
+ */
+
+ /*
+ space_required = packBuffer[0];
+ if (SAVE_Hp + space_required >= SAVE_HpLim) {
+ ReallyPerformThreadGC(space_required, rtsFalse);
+ SAVE_Hp -= space_required;
+ }
+ */
+ // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
+ newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
+ ASSERT(newGraph != NULL);
+ success = add_to_spark_queue(newGraph, &(MainRegTable.rSparks));
+
+ if (RtsFlags.ParFlags.ParStats.Full &&
+ RtsFlags.ParFlags.ParStats.Sparks &&
+ success)
+ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
+ GR_STOLEN, ((StgTSO *)NULL), newGraph,
+ 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+
+ IF_PAR_DEBUG(schedule,
+ if (success)
+ belch("--^^ added spark to unpacked graph %p (%s); %d sparks available on [%x] (%s)",
+ newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid);
+ else
+ belch("--^^ received non-sparkable closure %p (%s); nothing added to spark pool; %d sparks available on [%x]",
+ newGraph, info_type(newGraph), spark_queue_len(&(MainRegTable.rSparks)), mytid));
+ IF_PAR_DEBUG(packet,
+ belch("*< Unpacked graph with root at %p (%s):",
+ newGraph, info_type(newGraph));
+ PrintGraph(newGraph, 0));
+
+ IF_PAR_DEBUG(tables,
+ DebugPrintGAGAMap(gagamap, nGAs));
+
+ sendAck(sender, nGAs, gagamap);
+
+ //fishing = rtsFalse;
+ ASSERT(outstandingFishes>0);
+ outstandingFishes--;
+}
+
+/*
+ * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
+ * (which represent shared thunks that have been shipped) into fetch-mes
+ * to remote GAs.
+ */
+//@cindex processAck
+static void
+processAck(void)
+{
+ nat nGAs;
+ globalAddr *gaga;
+ globalAddr gagamap[256]; // ToDo: elim magic constant!! MAX_GAS * 2];??
+
+ unpackAck(&nGAs, gagamap);
+
+ IF_PAR_DEBUG(tables,
+ belch(",,,, Rcvd Ack (%d pairs)", nGAs);
+ DebugPrintGAGAMap(gagamap, nGAs));
+
+ IF_DEBUG(sanity,
+ checkGAGAMap(gagamap, nGAs));
+
+ /*
+ * For each (oldGA, newGA) pair, set the GA of the corresponding
+ * thunk to the newGA, convert the thunk to a FetchMe, and return
+ * the weight from the oldGA.
+ */
+ for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
+ StgClosure *old_closure = GALAlookup(gaga);
+ StgClosure *new_closure = GALAlookup(gaga + 1);
+
+ ASSERT(old_closure != NULL);
+ if (new_closure == NULL) {
+ /* We don't have this closure, so we make a fetchme for it */
+ globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsTrue);
+
+ /* convertToFetchMe should be done unconditionally here.
+ Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
+ so we have to check whether it is an RBH before converting
+
+ ASSERT(get_itbl(old_closure)==RBH);
+ */
+ if (get_itbl(old_closure)->type==RBH)
+ convertToFetchMe((StgRBH *)old_closure, ga);
+ } else {
+ /*
+ * Oops...we've got this one already; update the RBH to
+ * point to the object we already know about, whatever it
+ * happens to be.
+ */
+ CommonUp(old_closure, new_closure);
+
+ /*
+ * Increase the weight of the object by the amount just
+ * received in the second part of the ACK pair.
+ */
+ (void) addWeight(gaga + 1);
+ }
+ (void) addWeight(gaga);
+ }
+
+ /* check the sanity of the LAGA and GALA tables after mincing them */
+ IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
+}
+
+#ifdef DIST
+
+void
+bounceReval(void) {
+ barf("Task %x: TODO: should send NACK in response to REVAL",mytid);
+}
+
+static void
+processReval(GlobalTaskId sender) //similar to schedule...
+{ nat nelem, space_required, nGAs;
+ static rtsPackBuffer *packBuffer;
+ StgClosure *newGraph;
+ globalAddr *gagamap;
+ StgTSO* tso;
+ globalAddr *ga;
+
+ packBuffer = gumPackBuffer; /* HWL */
+ unpackSchedule(&nelem, packBuffer); /* okay, since the structure is the same */
+
+ IF_PAR_DEBUG(packet,
+ belch("@;~) [%x] Rcvd Reval (%d elems)", mytid, nelem);
+ PrintPacket(packBuffer));
+
+ /*
+ space_required = packBuffer[0];
+ if (SAVE_Hp + space_required >= SAVE_HpLim) {
+ ReallyPerformThreadGC(space_required, rtsFalse);
+ SAVE_Hp -= space_required;
+ }
+ */
+
+ // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!
+ newGraph = UnpackGraph(packBuffer, &gagamap, &nGAs);
+ ASSERT(newGraph != NULL);
+
+ IF_PAR_DEBUG(packet,
+ belch("@;~) Unpacked graph with root at %p (%s):",
+ newGraph, info_type(newGraph));
+ PrintGraph(newGraph, 0));
+
+ IF_PAR_DEBUG(tables,
+ DebugPrintGAGAMap(gagamap, nGAs));
+
+ IF_PAR_DEBUG(tables,
+ printLAGAtable();
+ DebugPrintGAGAMap(gagamap, nGAs));
+
+ //We don't send an Ack to the head!!!!
+ ASSERT(nGAs>0);
+ sendAck(sender, nGAs-1, gagamap+2);
+
+ IF_PAR_DEBUG(verbose,
+ belch("@;~) About to create Reval thread on behalf of %x",
+ sender));
+
+ tso=createGenThread(RtsFlags.GcFlags.initialStkSize,newGraph);
+ tso->priority=RevalPriority;
+ tso->revalSlot=gagamap->payload.gc.slot;//record who sent the reval
+ tso->revalTid =gagamap->payload.gc.gtid;
+ scheduleThread(tso);
+ context_switch = 1; // switch at the earliest opportunity
+}
+#endif
+
+
+//@node GUM Message Processor, Miscellaneous Functions, Message-Processing Functions, High Level Communications Routines
+//@subsection GUM Message Processor
+
+/*
+ * GUM Message Processor
+
+ * processMessages processes any messages that have arrived, calling
+ * appropriate routines depending on the message tag
+ * (opcode). N.B. Unless profiling it assumes that there {\em ARE} messages
+ * present and performs a blocking receive! During profiling it
+ * busy-waits in order to record idle time.
+ */
+
+//@cindex processMessages
+rtsBool
+processMessages(void)
+{
+ rtsPacket packet;
+ OpCode opcode;
+ GlobalTaskId task;
+ rtsBool receivedFinish = rtsFalse;
+
+ do {
+ packet = GetPacket(); /* Get next message; block until one available */
+ getOpcodeAndSender(packet, &opcode, &task);
+
+ if (task==SysManTask) {
+ switch (opcode) {
+ case PP_PETIDS:
+ processPEtids();
+ break;
+
+ case PP_FINISH:
+ IF_PAR_DEBUG(verbose,
+ belch("==== received FINISH [%p]", mytid));
+ /* this boolean value is returned and propagated to the main
+ scheduling loop, thus shutting-down this PE */
+ receivedFinish = rtsTrue;
+ break;
+
+ default:
+ barf("Task %x: received unknown opcode %x from SysMan",mytid, opcode);
+ }
+ } else if (taskIDtoPE(task)==0) {
+ /* When a new PE joins then potentially FISH & REVAL message may
+ reach PES before they are notified of the new PEs existance. The
+ only solution is to bounce/fail these messages back to the sender.
+ But we will worry about it once we start seeing these race
+ conditions! */
+ switch (opcode) {
+ case PP_FISH:
+ bounceFish();
+ break;
+#ifdef DIST
+ case PP_REVAL:
+ bounceReval();
+ break;
+#endif
+ case PP_PETIDS:
+ belch("Task %x: Ignoring PVM session opened by another SysMan %x",mytid,task);
+ break;
+
+ case PP_FINISH:
+ break;
+
+ default:
+ belch("Task %x: Ignoring opcode %x from unknown PE %x",mytid, opcode, task);
+ }
+ } else
+ switch (opcode) {
+ case PP_FETCH:
+ processFetch();
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_fetch_mess++;
+ }
+ break;
+
+ case PP_RESUME:
+ processResume(task);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_resume_mess++;
+ }
+ break;
+
+ case PP_ACK:
+ processAck();
+ break;
+
+ case PP_FISH:
+ processFish();
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_fish_mess++;
+ }
+ break;
+
+ case PP_FREE:
+ processFree();
+ break;
+
+ case PP_SCHEDULE:
+ processSchedule(task);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_schedule_mess++;
+ }
+ break;
+
+#ifdef DIST
+ case PP_REVAL:
+ processReval(task);
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.rec_reval_mess++;
+ }
+ break;
+#endif
+
+ default:
+ /* Anything we're not prepared to deal with. */
+ barf("Task %x: Unexpected opcode %x from %x",
+ mytid, opcode, task);
+ } /* switch */
+
+ } while (PacketsWaiting()); /* While there are messages: process them */
+ return receivedFinish;
+} /* processMessages */
+
+//@node Miscellaneous Functions, Index, GUM Message Processor, High Level Communications Routines
+//@subsection Miscellaneous Functions
+
+/*
+ * blockFetch blocks a BlockedFetch node on some kind of black hole.
+ */
+//@cindex blockFetch
+void
+blockFetch(StgBlockedFetch *bf, StgClosure *bh) {
+ bf->node = bh;
+ switch (get_itbl(bh)->type) {
+ case BLACKHOLE:
+ bf->link = END_BQ_QUEUE;
+ //((StgBlockingQueue *)bh)->header.info = &stg_BLACKHOLE_BQ_info;
+ SET_INFO(bh, &stg_BLACKHOLE_BQ_info); // turn closure into a blocking queue
+ ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
+
+ // put bh on the mutables list
+ recordMutable((StgMutClosure *)bh);
+ break;
+
+ case BLACKHOLE_BQ:
+ /* enqueue bf on blocking queue of closure bh */
+ bf->link = ((StgBlockingQueue *)bh)->blocking_queue;
+ ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
+
+ // put bh on the mutables list; ToDo: check
+ recordMutable((StgMutClosure *)bh);
+ break;
+
+ case FETCH_ME_BQ:
+ /* enqueue bf on blocking queue of closure bh */
+ bf->link = ((StgFetchMeBlockingQueue *)bh)->blocking_queue;
+ ((StgFetchMeBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
+
+ // put bh on the mutables list; ToDo: check
+ recordMutable((StgMutClosure *)bh);
+ break;
+
+ case RBH:
+ /* enqueue bf on blocking queue of closure bh */
+ bf->link = ((StgRBH *)bh)->blocking_queue;
+ ((StgRBH *)bh)->blocking_queue = (StgBlockingQueueElement *)bf;
+
+ // put bh on the mutables list; ToDo: check
+ recordMutable((StgMutClosure *)bh);
+ break;
+
+ default:
+ barf("blockFetch: thought %p was a black hole (IP %#lx, %s)",
+ (StgClosure *)bh, get_itbl((StgClosure *)bh),
+ info_type((StgClosure *)bh));
+ }
+ IF_PAR_DEBUG(bq,
+ belch("##++ blockFetch: after block the BQ of %p (%s) is:",
+ bh, info_type(bh));
+ print_bq(bh));
+}
+
+
+/*
+ @blockThread@ is called from the main scheduler whenever tso returns with
+ a ThreadBlocked return code; tso has already been added to a blocking
+ queue (that's done in the entry code of the closure, because it is a
+ cheap operation we have to do in any case); the main purpose of this
+ routine is to send a Fetch message in case we are blocking on a FETCHME(_BQ)
+ closure, which is indicated by the tso.why_blocked field;
+ we also write an entry into the log file if we are generating one
+
+ Should update exectime etc in the entry code already; but we don't have
+ something like ``system time'' in the log file anyway, so this should
+ even out the inaccuracies.
+*/
+
+//@cindex blockThread
+void
+blockThread(StgTSO *tso)
+{
+ globalAddr *remote_ga=NULL;
+ globalAddr *local_ga;
+ globalAddr fmbq_ga;
+
+ // ASSERT(we are on some blocking queue)
+ ASSERT(tso->block_info.closure != (StgClosure *)NULL);
+
+ /*
+ We have to check why this thread has been blocked.
+ */
+ switch (tso->why_blocked) {
+ case BlockedOnGA:
+ /* the closure must be a FETCH_ME_BQ; tso came in here via
+ FETCH_ME entry code */
+ ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
+
+ /* HACK: the link field is used to hold the GA between FETCH_ME_entry
+ end this point; if something (eg. GC) happens inbetween the whole
+ thing will blow up
+ The problem is that the ga field of the FETCH_ME has been overwritten
+ with the head of the blocking queue (which is tso).
+ */
+ ASSERT(looks_like_ga(&theGlobalFromGA));
+ // ASSERT(tso->link!=END_TSO_QUEUE && tso->link!=NULL);
+ remote_ga = &theGlobalFromGA; //tso->link;
+ tso->link = (StgTSO*)END_BQ_QUEUE;
+ /* it was tso which turned node from FETCH_ME into FETCH_ME_BQ =>
+ we have to send a Fetch message here! */
+ if (RtsFlags.ParFlags.ParStats.Full) {
+ /* Note that CURRENT_TIME may perform an unsafe call */
+ tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
+ tso->par.fetchcount++;
+ tso->par.blockedat = CURRENT_TIME;
+ /* we are about to send off a FETCH message, so dump a FETCH event */
+ DumpRawGranEvent(CURRENT_PROC,
+ taskIDtoPE(remote_ga->payload.gc.gtid),
+ GR_FETCH, tso, tso->block_info.closure, 0, 0);
+ }
+ /* Phil T. claims that this was a workaround for a hard-to-find
+ * bug, hence I'm leaving it out for now --SDM
+ */
+ /* Assign a brand-new global address to the newly created FMBQ */
+ local_ga = makeGlobal(tso->block_info.closure, rtsFalse);
+ splitWeight(&fmbq_ga, local_ga);
+ ASSERT(fmbq_ga.weight == 1U << (BITS_IN(unsigned) - 1));
+
+ sendFetch(remote_ga, &fmbq_ga, 0/*load*/);
+
+ // Global statistics: count no. of fetches
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ globalParStats.tot_fetch_mess++;
+ }
+
+ IF_DEBUG(sanity,
+ theGlobalFromGA.payload.gc.gtid = (GlobalTaskId)0);
+ break;
+
+ case BlockedOnGA_NoSend:
+ /* the closure must be a FETCH_ME_BQ; tso came in here via
+ FETCH_ME_BQ entry code */
+ ASSERT(get_itbl(tso->block_info.closure)->type==FETCH_ME_BQ);
+
+ /* Fetch message has been sent already */
+ if (RtsFlags.ParFlags.ParStats.Full) {
+ /* Note that CURRENT_TIME may perform an unsafe call */
+ tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
+ tso->par.blockcount++;
+ tso->par.blockedat = CURRENT_TIME;
+ /* dump a block event, because fetch has been sent already */
+ DumpRawGranEvent(CURRENT_PROC, thisPE,
+ GR_BLOCK, tso, tso->block_info.closure, 0, 0);
+ }
+ break;
+
+ case BlockedOnMVar:
+ case BlockedOnBlackHole:
+ /* the closure must be a BLACKHOLE_BQ or an RBH; tso came in here via
+ BLACKHOLE(_BQ) or CAF_BLACKHOLE or RBH entry code */
+ ASSERT(get_itbl(tso->block_info.closure)->type==MVAR ||
+ get_itbl(tso->block_info.closure)->type==BLACKHOLE_BQ ||
+ get_itbl(tso->block_info.closure)->type==RBH);
+
+ /* if collecting stats update the execution time etc */
+ if (RtsFlags.ParFlags.ParStats.Full) {
+ /* Note that CURRENT_TIME may perform an unsafe call */
+ tso->par.exectime += CURRENT_TIME - tso->par.blockedat;
+ tso->par.blockcount++;
+ tso->par.blockedat = CURRENT_TIME;
+ DumpRawGranEvent(CURRENT_PROC, thisPE,
+ GR_BLOCK, tso, tso->block_info.closure, 0, 0);
+ }
+ break;
+
+ case BlockedOnDelay:
+ /* Whats sort of stats shall we collect for an explicit threadDelay? */
+ IF_PAR_DEBUG(verbose,
+ belch("##++ blockThread: TSO %d blocked on ThreadDelay",
+ tso->id));
+ break;
+
+ /* Check that the following is impossible to happen, indeed
+ case BlockedOnException:
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ */
+ default:
+ barf("blockThread: impossible why_blocked code %d for TSO %d",
+ tso->why_blocked, tso->id);
+ }
+
+ IF_PAR_DEBUG(verbose,
+ belch("##++ blockThread: TSO %d blocked on closure %p (%s); %s",
+ tso->id, tso->block_info.closure, info_type(tso->block_info.closure),
+ (tso->why_blocked==BlockedOnGA) ? "Sent FETCH for GA" : ""));
+
+ IF_PAR_DEBUG(bq,
+ print_bq(tso->block_info.closure));
+}
+
+/*
+ * ChoosePE selects a GlobalTaskId from the array of PEs 'at random'.
+ * Important properties:
+ * - it varies during execution, even if the PE is idle
+ * - it's different for each PE
+ * - we never send a fish to ourselves
+ */
+extern long lrand48 (void);
+
+//@cindex choosePE
+GlobalTaskId
+choosePE(void)
+{
+ long temp;
+
+ temp = lrand48() % nPEs;
+ if (allPEs[temp] == mytid) { /* Never send a FISH to yourself */
+ temp = (temp + 1) % nPEs;
+ }
+ return allPEs[temp];
+}
+
+/*
+ * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
+ * of the ga argument; called from processFetch when the local closure is
+ * under evaluation
+ */
+//@cindex createBlockedFetch
+StgClosure *
+createBlockedFetch (globalAddr ga, globalAddr rga)
+{
+ StgBlockedFetch *bf;
+ StgClosure *closure;
+
+ closure = GALAlookup(&ga);
+ if ((bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch))) == NULL) {
+ barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
+ GarbageCollect(GetRoots, rtsFalse);
+ closure = GALAlookup(&ga);
+ bf = (StgBlockedFetch *)allocate(_HS + sizeofW(StgBlockedFetch));
+ // ToDo: check whether really guaranteed to succeed 2nd time around
+ }
+
+ ASSERT(bf != (StgBlockedFetch *)NULL);
+ SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
+ // ToDo: check whether other header info is needed
+ bf->node = closure;
+ bf->ga.payload.gc.gtid = rga.payload.gc.gtid;
+ bf->ga.payload.gc.slot = rga.payload.gc.slot;
+ bf->ga.weight = rga.weight;
+ // bf->link = NULL; debugging
+
+ IF_PAR_DEBUG(schedule,
+ fprintf(stderr, "%%%%// created BF: bf=%p (%s) of closure , GA: ",
+ bf, info_type((StgClosure*)bf));
+ printGA(&(bf->ga));
+ fputc('\n',stderr));
+ return (StgClosure *)bf;
+}
+
+/*
+ * waitForTermination enters a loop ignoring spurious messages while
+ * waiting for the termination sequence to be completed.
+ */
+//@cindex waitForTermination
+void
+waitForTermination(void)
+{
+ do {
+ rtsPacket p = GetPacket();
+ processUnexpectedMessage(p);
+ } while (rtsTrue);
+}
+
+#ifdef DEBUG
+//@cindex DebugPrintGAGAMap
+void
+DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
+{
+ nat i;
+
+ for (i = 0; i < nGAs; ++i, gagamap += 2)
+ fprintf(stderr, "__ gagamap[%d] = ((%x, %d, %x)) -> ((%x, %d, %x))\n", i,
+ gagamap[0].payload.gc.gtid, gagamap[0].payload.gc.slot, gagamap[0].weight,
+ gagamap[1].payload.gc.gtid, gagamap[1].payload.gc.slot, gagamap[1].weight);
+}
+
+//@cindex checkGAGAMap
+void
+checkGAGAMap(globalAddr *gagamap, int nGAs)
+{
+ nat i;
+
+ for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
+ ASSERT(looks_like_ga(gagamap));
+ ASSERT(looks_like_ga(gagamap+1));
+ }
+}
+#endif
+
+//@cindex freeMsgBuffer
+static StgWord **freeMsgBuffer = NULL;
+//@cindex freeMsgIndex
+static nat *freeMsgIndex = NULL;
+
+//@cindex prepareFreeMsgBuffers
+void
+prepareFreeMsgBuffers(void)
+{
+ nat i;
+
+ /* Allocate the freeMsg buffers just once and then hang onto them. */
+ if (freeMsgIndex == NULL) {
+ freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat),
+ "prepareFreeMsgBuffers (Index)");
+ freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *),
+ "prepareFreeMsgBuffers (Buffer)");
+
+ for(i = 0; i < nPEs; i++)
+ if (i != (thisPE-1))
+ freeMsgBuffer[i] = (StgPtr) stgMallocWords(RtsFlags.ParFlags.packBufferSize,
+ "prepareFreeMsgBuffers (Buffer #i)");
+ else
+ freeMsgBuffer[i] = 0;
+ }
+
+ /* Initialize the freeMsg buffer pointers to point to the start of their
+ buffers */
+ for (i = 0; i < nPEs; i++)
+ freeMsgIndex[i] = 0;
+}
+
+//@cindex freeRemoteGA
+void
+freeRemoteGA(int pe, globalAddr *ga)
+{
+ nat i;
+
+ ASSERT(GALAlookup(ga) == NULL);
+
+ if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
+ IF_PAR_DEBUG(free,
+ belch("!! Filled a free message buffer (sending remaining messages indivisually)"));
+
+ sendFree(ga->payload.gc.gtid, i, freeMsgBuffer[pe]);
+ i = 0;
+ }
+ freeMsgBuffer[pe][i++] = (StgWord) ga->weight;
+ freeMsgBuffer[pe][i++] = (StgWord) ga->payload.gc.slot;
+ freeMsgIndex[pe] = i;
+
+ IF_DEBUG(sanity,
+ ga->weight = 0xdead0add;
+ ga->payload.gc.gtid = 0xbbbbbbbb;
+ ga->payload.gc.slot = 0xbbbbbbbb;);
+}
+
+//@cindex sendFreeMessages
+void
+sendFreeMessages(void)
+{
+ nat i;
+
+ for (i = 0; i < nPEs; i++)
+ if (freeMsgIndex[i] > 0)
+ sendFree(allPEs[i], freeMsgIndex[i], freeMsgBuffer[i]);
+}
+
+/* synchronises with the other PEs. Receives and records in a global
+ * variable the task-id of SysMan. If this is the main thread (discovered
+ * in main.lc), identifies itself to SysMan. Finally it receives
+ * from SysMan an array of the Global Task Ids of each PE, which is
+ * returned as the value of the function.
+ */
+
+#if defined(PAR_TICKY)
+/* Has to see freeMsgIndex, so must be defined here not in ParTicky.c */
+//@cindex stats_CntFreeGA
+void
+stats_CntFreeGA (void) { // stats only
+
+ // Global statistics: residency of thread and spark pool
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ nat i, s;
+
+ globalParStats.cnt_free_GA++;
+ for (i = 0, s = 0; i < nPEs; i++)
+ s += globalParStats.tot_free_GA += freeMsgIndex[i]/2;
+
+ if ( s > globalParStats.res_free_GA )
+ globalParStats.res_free_GA = s;
+ }
+}
+#endif /* PAR_TICKY */
+
+#endif /* PAR -- whole file */
+
+//@node Index, , Miscellaneous Functions, High Level Communications Routines
+//@subsection Index
+
+//@index
+//* ACK:: @cindex\s-+ACK
+//* DebugPrintGAGAMap:: @cindex\s-+DebugPrintGAGAMap
+//* FETCH:: @cindex\s-+FETCH
+//* FISH:: @cindex\s-+FISH
+//* FREE:: @cindex\s-+FREE
+//* RESUME:: @cindex\s-+RESUME
+//* SCHEDULE:: @cindex\s-+SCHEDULE
+//* blockFetch:: @cindex\s-+blockFetch
+//* choosePE:: @cindex\s-+choosePE
+//* freeMsgBuffer:: @cindex\s-+freeMsgBuffer
+//* freeMsgIndex:: @cindex\s-+freeMsgIndex
+//* freeRemoteGA:: @cindex\s-+freeRemoteGA
+//* gumPackBuffer:: @cindex\s-+gumPackBuffer
+//* initMoreBuffers:: @cindex\s-+initMoreBuffers
+//* prepareFreeMsgBuffers:: @cindex\s-+prepareFreeMsgBuffers
+//* processAck:: @cindex\s-+processAck
+//* processFetch:: @cindex\s-+processFetch
+//* processFetches:: @cindex\s-+processFetches
+//* processFish:: @cindex\s-+processFish
+//* processFree:: @cindex\s-+processFree
+//* processMessages:: @cindex\s-+processMessages
+//* processResume:: @cindex\s-+processResume
+//* processSchedule:: @cindex\s-+processSchedule
+//* sendAck:: @cindex\s-+sendAck
+//* sendFetch:: @cindex\s-+sendFetch
+//* sendFish:: @cindex\s-+sendFish
+//* sendFree:: @cindex\s-+sendFree
+//* sendFreeMessages:: @cindex\s-+sendFreeMessages
+//* sendResume:: @cindex\s-+sendResume
+//* sendSchedule:: @cindex\s-+sendSchedule
+//* unpackAck:: @cindex\s-+unpackAck
+//* unpackFetch:: @cindex\s-+unpackFetch
+//* unpackFish:: @cindex\s-+unpackFish
+//* unpackFree:: @cindex\s-+unpackFree
+//* unpackResume:: @cindex\s-+unpackResume
+//* unpackSchedule:: @cindex\s-+unpackSchedule
+//* waitForTermination:: @cindex\s-+waitForTermination
+//@end index