summaryrefslogtreecommitdiff
path: root/rts/parallel/GranSim.c
diff options
context:
space:
mode:
authorSimon Marlow <simonmar@microsoft.com>2006-04-07 02:05:11 +0000
committerSimon Marlow <simonmar@microsoft.com>2006-04-07 02:05:11 +0000
commit0065d5ab628975892cea1ec7303f968c3338cbe1 (patch)
tree8e2afe0ab48ee33cf95009809d67c9649573ef92 /rts/parallel/GranSim.c
parent28a464a75e14cece5db40f2765a29348273ff2d2 (diff)
downloadhaskell-0065d5ab628975892cea1ec7303f968c3338cbe1.tar.gz
Reorganisation of the source tree
Most of the other users of the fptools build system have migrated to Cabal, and with the move to darcs we can now flatten the source tree without losing history, so here goes. The main change is that the ghc/ subdir is gone, and most of what it contained is now at the top level. The build system now makes no pretense at being multi-project, it is just the GHC build system. No doubt this will break many things, and there will be a period of instability while we fix the dependencies. A straightforward build should work, but I haven't yet fixed binary/source distributions. Changes to the Building Guide will follow, too.
Diffstat (limited to 'rts/parallel/GranSim.c')
-rw-r--r--rts/parallel/GranSim.c3015
1 files changed, 3015 insertions, 0 deletions
diff --git a/rts/parallel/GranSim.c b/rts/parallel/GranSim.c
new file mode 100644
index 0000000000..b1cc0962be
--- /dev/null
+++ b/rts/parallel/GranSim.c
@@ -0,0 +1,3015 @@
+/*
+ Time-stamp: <Tue Mar 06 2001 00:17:42 Stardate: [-30]6285.06 hwloidl>
+
+ Variables and functions specific to GranSim the parallelism simulator
+ for GPH.
+*/
+
+//@node GranSim specific code, , ,
+//@section GranSim specific code
+
+/*
+ Macros for dealing with the new and improved GA field for simulating
+ parallel execution. Based on @CONCURRENT@ package. The GA field now
+ contains a mask, where the n-th bit stands for the n-th processor, where
+ this data can be found. In case of multiple copies, several bits are
+ set. The total number of processors is bounded by @MAX_PROC@, which
+ should be <= the length of a word in bits. -- HWL
+*/
+
+//@menu
+//* Includes::
+//* Prototypes and externs::
+//* Constants and Variables::
+//* Initialisation::
+//* Global Address Operations::
+//* Global Event Queue::
+//* Spark queue functions::
+//* Scheduling functions::
+//* Thread Queue routines::
+//* GranSim functions::
+//* GranSimLight routines::
+//* Code for Fetching Nodes::
+//* Idle PEs::
+//* Routines directly called from Haskell world::
+//* Emiting profiling info for GrAnSim::
+//* Dumping routines::
+//* Index::
+//@end menu
+
+//@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
+//@subsection Includes
+
+#include "Rts.h"
+#include "RtsFlags.h"
+#include "RtsUtils.h"
+#include "StgMiscClosures.h"
+#include "StgTypes.h"
+#include "Schedule.h"
+#include "SchedAPI.h" // for pushClosure
+#include "GranSimRts.h"
+#include "GranSim.h"
+#include "ParallelRts.h"
+#include "ParallelDebug.h"
+#include "Sparks.h"
+#include "Storage.h" // for recordMutable
+
+
+//@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
+//@subsection Prototypes and externs
+
+#if defined(GRAN)
+
+/* Prototypes */
+static inline PEs ga_to_proc(StgWord);
+static inline rtsBool any_idle(void);
+static inline nat idlers(void);
+ PEs where_is(StgClosure *node);
+
+static rtsBool stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
+static rtsBool stealSpark(PEs proc);
+static rtsBool stealThread(PEs proc);
+static rtsBool stealSparkMagic(PEs proc);
+static rtsBool stealThreadMagic(PEs proc);
+/* subsumed by stealSomething
+static void stealThread(PEs proc);
+static void stealSpark(PEs proc);
+*/
+static rtsTime sparkStealTime(void);
+static nat natRandom(nat from, nat to);
+static PEs findRandomPE(PEs proc);
+static void sortPEsByTime (PEs proc, PEs *pes_by_time,
+ nat *firstp, nat *np);
+
+void GetRoots(void);
+
+#endif /* GRAN */
+
+//@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
+//@subsection Constants and Variables
+
+#if defined(GRAN) || defined(PAR)
+/* See GranSim.h for the definition of the enum gran_event_types */
+char *gran_event_names[] = {
+ "START", "START(Q)",
+ "STEALING", "STOLEN", "STOLEN(Q)",
+ "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
+ "SCHEDULE", "DESCHEDULE",
+ "END",
+ "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
+ "ALLOC",
+ "TERMINATE",
+ "SYSTEM_START", "SYSTEM_END", /* only for debugging */
+ "??"
+};
+#endif
+
+#if defined(GRAN) /* whole file */
+char *proc_status_names[] = {
+ "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy",
+ "UnknownProcStatus"
+};
+
+/* For internal use (event statistics) only */
+char *event_names[] =
+ { "ContinueThread", "StartThread", "ResumeThread",
+ "MoveSpark", "MoveThread", "FindWork",
+ "FetchNode", "FetchReply",
+ "GlobalBlock", "UnblockThread"
+ };
+
+//@cindex CurrentProc
+PEs CurrentProc = 0;
+
+/*
+ ToDo: Create a structure for the processor status and put all the
+ arrays below into it.
+ -- HWL */
+
+//@cindex CurrentTime
+/* One clock for each PE */
+rtsTime CurrentTime[MAX_PROC];
+
+/* Useful to restrict communication; cf fishing model in GUM */
+nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
+
+/* Status of each PE (new since but independent of GranSim Light) */
+rtsProcStatus procStatus[MAX_PROC];
+
+# if defined(GRAN) && defined(GRAN_CHECK)
+/* To check if the RTS ever tries to run a thread that should be blocked
+ because of fetching remote data */
+StgTSO *BlockedOnFetch[MAX_PROC];
+# define FETCH_MASK_TSO 0x08000000 /* only bits 0, 1, 2 should be used */
+# endif
+
+nat SparksAvail = 0; /* How many sparks are available */
+nat SurplusThreads = 0; /* How many excess threads are there */
+
+/* Do we need to reschedule following a fetch? */
+rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse;
+rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
+
+//@cindex spark queue
+/* GranSim: a globally visible array of spark queues */
+rtsSparkQ pending_sparks_hds[MAX_PROC];
+rtsSparkQ pending_sparks_tls[MAX_PROC];
+
+nat sparksIgnored = 0, sparksCreated = 0;
+
+GlobalGranStats globalGranStats;
+
+nat gran_arith_cost, gran_branch_cost, gran_load_cost,
+ gran_store_cost, gran_float_cost;
+
+/*
+Old comment from 0.29. ToDo: Check and update -- HWL
+
+The following variables control the behaviour of GrAnSim. In general, there
+is one RTS option for enabling each of these features. In getting the
+desired setup of GranSim the following questions have to be answered:
+\begin{itemize}
+\item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)?
+ Currently only unfair scheduling is supported.
+\item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)?
+ Either block and wait for the
+ data or reschedule and do some other work.
+ Thus, if this variable is true, asynchronous communication is
+ modelled. Block on fetch mainly makes sense for incremental fetching.
+
+ There is also a simplified fetch variant available
+ (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
+ communication. It is faster but the results will be less accurate.
+\item How aggressive to be in getting work after a reschedule on fetch
+ (@RtsFlags.GranFlags.FetchStrategy@)?
+ This is determined by the so-called {\em fetching
+ strategy\/}. Currently, there are four possibilities:
+ \begin{enumerate}
+ \item Only run a runnable thread.
+ \item Turn a spark into a thread, if necessary.
+ \item Steal a remote spark, if necessary.
+ \item Steal a runnable thread from another processor, if necessary.
+ \end{itemize}
+ The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
+ when rescheduling on a fetch.
+\item Should sparks or threads be stolen first when looking for work
+ (@RtsFlags.GranFlags.DoStealThreadsFirst@)?
+ The default is to steal sparks first (much cheaper).
+\item Should the RTS use a lazy thread creation scheme
+ (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)? By default yes i.e.\ sparks are only
+ turned into threads when work is needed. Also note, that sparks
+ can be discarded by the RTS (this is done in the case of an overflow
+ of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@ to @True@ forces
+ the creation of threads at the next possibility (i.e.\ when new work
+ is demanded the next time).
+\item Should data be fetched closure-by-closure or in packets
+ (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental
+ (i.e.\ closure-by-closure) strategy. This makes sense in a
+ low-latency setting but is bad in a high-latency system. Setting
+ @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
+ parameters determine the size of the packets (@pack_buffer_size@) and the number of
+ thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
+\item If there is no other possibility to find work, should runnable threads
+ be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
+ RTS tried to get sparks (either local or remote ones) first. Thread
+ migration is very expensive, since a whole TSO has to be transferred
+ and probably data locality becomes worse in the process. Note, that
+ the closure, which will be evaluated next by that TSO is not
+ transferred together with the TSO (that might block another thread).
+\item Should the RTS distinguish between sparks created by local nodes and
+ stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)? The idea is to improve
+ data locality by preferring sparks of local nodes (it is more likely
+ that the data for those sparks is already on the local processor).
+ However, such a distinction also imposes an overhead on the spark
+ queue management, and typically a large number of sparks are
+ generated during execution. By default this variable is set to @False@.
+\item Should the RTS use granularity control mechanisms? The idea of a
+ granularity control mechanism is to make use of granularity
+ information provided via annotation of the @par@ construct in order
+ to prefer bigger threads when either turning a spark into a thread or
+ when choosing the next thread to schedule. Currently, three such
+ mechanisms are implemented:
+ \begin{itemize}
+ \item Cut-off: The granularity information is interpreted as a
+ priority. If a threshold priority is given to the RTS, then
+ only those sparks with a higher priority than the threshold
+ are actually created. Other sparks are immediately discarded.
+ This is similar to a usual cut-off mechanism often used in
+ parallel programs, where parallelism is only created if the
+ input data is lage enough. With this option, the choice is
+ hidden in the RTS and only the threshold value has to be
+ provided as a parameter to the runtime system.
+ \item Priority Sparking: This mechanism keeps priorities for sparks
+ and chooses the spark with the highest priority when turning
+ a spark into a thread. After that the priority information is
+ discarded. The overhead of this mechanism comes from
+ maintaining a sorted spark queue.
+ \item Priority Scheduling: This mechanism keeps the granularity
+ information for threads, to. Thus, on each reschedule the
+ largest thread is chosen. This mechanism has a higher
+ overhead, as the thread queue is sorted, too.
+ \end{itemize}
+\end{itemize}
+*/
+
+//@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
+//@subsection Initialisation
+
+void
+init_gr_stats (void) {
+ memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
+#if 0
+ /* event stats */
+ globalGranStats.noOfEvents = 0;
+ for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
+
+ /* communication stats */
+ globalGranStats.fetch_misses = 0;
+ globalGranStats.tot_low_pri_sparks = 0;
+
+ /* obscure stats */
+ globalGranStats.rs_sp_count = 0;
+ globalGranStats.rs_t_count = 0;
+ globalGranStats.ntimes_total = 0,
+ globalGranStats.fl_total = 0;
+ globalGranStats.no_of_steals = 0;
+
+ /* spark queue stats */
+ globalGranStats.tot_sq_len = 0,
+ globalGranStats.tot_sq_probes = 0;
+ globalGranStats.tot_sparks = 0;
+ globalGranStats.withered_sparks = 0;
+ globalGranStats.tot_add_threads = 0;
+ globalGranStats.tot_tq_len = 0;
+ globalGranStats.non_end_add_threads = 0;
+
+ /* thread stats */
+ globalGranStats.tot_threads_created = 0;
+ for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
+#endif /* 0 */
+}
+
+//@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
+//@subsection Global Address Operations
+/*
+ ----------------------------------------------------------------------
+ Global Address Operations
+
+ These functions perform operations on the global-address (ga) part of a
+ closure. The ga is the only new field (1 word) in a closure introduced by
+ GrAnSim. It serves as a bitmask, indicating on which processor the
+ closure is residing. Since threads are described by Thread State Object
+ (TSO), which is nothing but another kind of closure, this scheme allows
+ gives placement information about threads.
+
+ A ga is just a bitmask, so the operations on them are mainly bitmask
+ manipulating functions. Note, that there are important macros like PROCS,
+ IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
+
+ NOTE: In GrAnSim-light we don't maintain placement information. This
+ allows to simulate an arbitrary number of processors. The price we have
+ to be is the lack of costing any communication properly. In short,
+ GrAnSim-light is meant to reveal the maximal parallelism in a program.
+ From an implementation point of view the important thing is: {\em
+ GrAnSim-light does not maintain global-addresses}. */
+
+/* ga_to_proc returns the first processor marked in the bitmask ga.
+ Normally only one bit in ga should be set. But for PLCs all bits
+ are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
+
+//@cindex ga_to_proc
+
+static inline PEs
+ga_to_proc(StgWord ga)
+{
+ PEs i;
+ for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
+ ASSERT(i<RtsFlags.GranFlags.proc);
+ return (i);
+}
+
+/* NB: This takes a *node* rather than just a ga as input */
+//@cindex where_is
+PEs
+where_is(StgClosure *node)
+{ return (ga_to_proc(PROCS(node))); }
+
+// debugging only
+//@cindex is_unique
+rtsBool
+is_unique(StgClosure *node)
+{
+ PEs i;
+ rtsBool unique = rtsFalse;
+
+ for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
+ if (IS_LOCAL_TO(PROCS(node), i))
+ if (unique) // exactly 1 instance found so far
+ return rtsFalse; // found a 2nd instance => not unique
+ else
+ unique = rtsTrue; // found 1st instance
+ ASSERT(unique); // otherwise returned from within loop
+ return (unique);
+}
+
+//@cindex any_idle
+static inline rtsBool
+any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
+ PEs i;
+ rtsBool any_idle;
+ for(i=0, any_idle=rtsFalse;
+ !any_idle && i<RtsFlags.GranFlags.proc;
+ any_idle = any_idle || procStatus[i] == Idle, i++)
+ {} ;
+}
+
+//@cindex idlers
+static inline nat
+idlers(void) { /* number of idle PEs */
+ PEs i, j;
+ for(i=0, j=0;
+ i<RtsFlags.GranFlags.proc;
+ j += (procStatus[i] == Idle) ? 1 : 0, i++)
+ {} ;
+ return j;
+}
+
+//@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
+//@subsection Global Event Queue
+/*
+The following routines implement an ADT of an event-queue (FIFO).
+ToDo: Put that in an own file(?)
+*/
+
+/* Pointer to the global event queue; events are currently malloc'ed */
+rtsEventQ EventHd = NULL;
+
+//@cindex get_next_event
+rtsEvent *
+get_next_event(void)
+{
+ static rtsEventQ entry = NULL;
+
+ if (EventHd == NULL) {
+ barf("No next event. This may be caused by a circular data dependency in the program.");
+ }
+
+ if (entry != NULL)
+ free((char *)entry);
+
+ if (RtsFlags.GranFlags.GranSimStats.Global) { /* count events */
+ globalGranStats.noOfEvents++;
+ globalGranStats.event_counts[EventHd->evttype]++;
+ }
+
+ entry = EventHd;
+
+ IF_GRAN_DEBUG(event_trace,
+ print_event(entry));
+
+ EventHd = EventHd->next;
+ return(entry);
+}
+
+/* When getting the time of the next event we ignore CONTINUETHREAD events:
+ we don't want to be interrupted before the end of the current time slice
+ unless there is something important to handle.
+*/
+//@cindex get_time_of_next_event
+rtsTime
+get_time_of_next_event(void)
+{
+ rtsEventQ event = EventHd;
+
+ while (event != NULL && event->evttype==ContinueThread) {
+ event = event->next;
+ }
+ if(event == NULL)
+ return ((rtsTime) 0);
+ else
+ return (event->time);
+}
+
+/* ToDo: replace malloc/free with a free list */
+//@cindex insert_event
+void
+insert_event(newentry)
+rtsEvent *newentry;
+{
+ rtsEventType evttype = newentry->evttype;
+ rtsEvent *event, **prev;
+
+ /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
+
+ /* Search the queue and insert at the right point:
+ FINDWORK before everything, CONTINUETHREAD after everything.
+
+ This ensures that we find any available work after all threads have
+ executed the current cycle. This level of detail would normally be
+ irrelevant, but matters for ridiculously low latencies...
+ */
+
+ /* Changed the ordering: Now FINDWORK comes after everything but
+ CONTINUETHREAD. This makes sure that a MOVESPARK comes before a
+ FINDWORK. This is important when a GranSimSparkAt happens and
+ DoAlwaysCreateThreads is turned on. Also important if a GC occurs
+ when trying to build a new thread (see much_spark) -- HWL 02/96 */
+
+ if(EventHd == NULL)
+ EventHd = newentry;
+ else {
+ for (event = EventHd, prev=(rtsEvent**)&EventHd;
+ event != NULL;
+ prev = (rtsEvent**)&(event->next), event = event->next) {
+ switch (evttype) {
+ case FindWork: if ( event->time < newentry->time ||
+ ( (event->time == newentry->time) &&
+ (event->evttype != ContinueThread) ) )
+ continue;
+ else
+ break;
+ case ContinueThread: if ( event->time <= newentry->time )
+ continue;
+ else
+ break;
+ default: if ( event->time < newentry->time ||
+ ((event->time == newentry->time) &&
+ (event->evttype == newentry->evttype)) )
+ continue;
+ else
+ break;
+ }
+ /* Insert newentry here (i.e. before event) */
+ *prev = newentry;
+ newentry->next = event;
+ break;
+ }
+ if (event == NULL)
+ *prev = newentry;
+ }
+}
+
+//@cindex new_event
+void
+new_event(proc,creator,time,evttype,tso,node,spark)
+PEs proc, creator;
+rtsTime time;
+rtsEventType evttype;
+StgTSO *tso;
+StgClosure *node;
+rtsSpark *spark;
+{
+ rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
+
+ newentry->proc = proc;
+ newentry->creator = creator;
+ newentry->time = time;
+ newentry->evttype = evttype;
+ newentry->tso = tso;
+ newentry->node = node;
+ newentry->spark = spark;
+ newentry->gc_info = 0;
+ newentry->next = NULL;
+
+ insert_event(newentry);
+
+ IF_DEBUG(gran,
+ fprintf(stderr, "GRAN: new_event: \n");
+ print_event(newentry));
+}
+
+//@cindex prepend_event
+void
+prepend_event(event) /* put event at beginning of EventQueue */
+rtsEvent *event;
+{ /* only used for GC! */
+ event->next = EventHd;
+ EventHd = event;
+}
+
+//@cindex grab_event
+rtsEventQ
+grab_event(void) /* undo prepend_event i.e. get the event */
+{ /* at the head of EventQ but don't free anything */
+ rtsEventQ event = EventHd;
+
+ if (EventHd == NULL) {
+ barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
+ }
+
+ EventHd = EventHd->next;
+ return (event);
+}
+
+//@cindex traverse_eventq_for_gc
+void
+traverse_eventq_for_gc(void)
+{
+ rtsEventQ event = EventHd;
+ StgWord bufsize;
+ StgClosure *closurep;
+ StgTSO *tsop;
+ StgPtr buffer, bufptr;
+ PEs proc, creator;
+
+ /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
+ orig closure (root of packed graph). This means that a graph, which is
+ between processors at the time of GC is fetched again at the time when
+ it would have arrived, had there been no GC. Slightly inaccurate but
+ safe for GC.
+ This is only needed for GUM style fetchng. -- HWL */
+ if (!RtsFlags.GranFlags.DoBulkFetching)
+ return;
+
+ for(event = EventHd; event!=NULL; event=event->next) {
+ if (event->evttype==FetchReply) {
+ buffer = stgCast(StgPtr,event->node);
+ ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG); /* It's a pack buffer */
+ bufsize = buffer[PACK_SIZE_LOCN];
+ closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
+ tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
+ proc = event->proc;
+ creator = event->creator; /* similar to unpacking */
+ for (bufptr=buffer+PACK_HDR_SIZE;
+ bufptr<(buffer+bufsize);
+ bufptr++) {
+ // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
+ // (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
+ if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
+ convertFromRBH(stgCast(StgClosure *,bufptr));
+ }
+ }
+ free(buffer);
+ event->evttype = FetchNode;
+ event->proc = creator;
+ event->creator = proc;
+ event->node = closurep;
+ event->tso = tsop;
+ event->gc_info = 0;
+ }
+ }
+}
+
+void
+markEventQueue(void)
+{
+ StgClosure *MarkRoot(StgClosure *root); // prototype
+
+ rtsEventQ event = EventHd;
+ nat len;
+
+ /* iterate over eventq and register relevant fields in event as roots */
+ for(event = EventHd, len = 0; event!=NULL; event=event->next, len++) {
+ switch (event->evttype) {
+ case ContinueThread:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ break;
+ case StartThread:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
+ break;
+ case ResumeThread:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
+ break;
+ case MoveSpark:
+ event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
+ break;
+ case MoveThread:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ break;
+ case FindWork:
+ break;
+ case FetchNode:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
+ break;
+ case FetchReply:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ if (RtsFlags.GranFlags.DoBulkFetching)
+ // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
+ belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
+ else
+ event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
+ break;
+ case GlobalBlock:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
+ break;
+ case UnblockThread:
+ event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
+ event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
+ break;
+ default:
+ barf("markEventQueue: trying to mark unknown event @ %p", event);
+ }}
+ IF_DEBUG(gc,
+ belch("GC: markEventQueue: %d events in queue", len));
+}
+
+/*
+ Prune all ContinueThread events related to tso or node in the eventq.
+ Currently used if a thread leaves STG land with ThreadBlocked status,
+ i.e. it blocked on a closure and has been put on its blocking queue. It
+ will be reawakended via a call to awakenBlockedQueue. Until then no
+ event effecting this tso should appear in the eventq. A bit of a hack,
+ because ideally we shouldn't generate such spurious ContinueThread events
+ in the first place.
+*/
+//@cindex prune_eventq
+void
+prune_eventq(tso, node)
+StgTSO *tso;
+StgClosure *node;
+{ rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
+
+ /* node unused for now */
+ ASSERT(node==NULL);
+ /* tso must be valid, then */
+ ASSERT(tso!=END_TSO_QUEUE);
+ while (event != NULL) {
+ if (event->evttype==ContinueThread &&
+ (event->tso==tso)) {
+ IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
+ belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
+ event->tso->id, event->tso, event->proc, event->time, event));
+ if (prev==(rtsEventQ)NULL) { // beginning of eventq
+ EventHd = event->next;
+ free(event);
+ event = EventHd;
+ } else {
+ prev->next = event->next;
+ free(event);
+ event = prev->next;
+ }
+ } else { // no pruning necessary; go to next event
+ prev = event;
+ event = event->next;
+ }
+ }
+}
+
+//@cindex print_event
+void
+print_event(event)
+rtsEvent *event;
+{
+ char str_tso[16], str_node[16];
+ StgThreadID tso_id;
+
+ if (event->tso==END_TSO_QUEUE) {
+ strcpy(str_tso, "______");
+ tso_id = 0;
+ } else {
+ sprintf(str_tso, "%p", event->tso);
+ tso_id = (event->tso==NULL) ? 0 : event->tso->id;
+ }
+ if (event->node==(StgClosure*)NULL) {
+ strcpy(str_node, "______");
+ } else {
+ sprintf(str_node, "%p", event->node);
+ }
+ // HWL: shouldn't be necessary; ToDo: nuke
+ //str_tso[6]='\0';
+ //str_node[6]='\0';
+
+ if (event==NULL)
+ fprintf(stderr,"Evt: NIL\n");
+ else
+ fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
+ event_names[event->evttype], event->evttype,
+ event->proc, event->creator, event->time,
+ tso_id, str_tso, str_node
+ /*, event->spark, event->next */ );
+
+}
+
+//@cindex print_eventq
+void
+print_eventq(hd)
+rtsEvent *hd;
+{
+ rtsEvent *x;
+
+ fprintf(stderr,"Event Queue with root at %p:\n", hd);
+ for (x=hd; x!=NULL; x=x->next) {
+ print_event(x);
+ }
+}
+
+/*
+ Spark queue functions are now all in Sparks.c!!
+*/
+//@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
+//@subsection Scheduling functions
+
+/*
+ These functions are variants of thread initialisation and therefore
+ related to initThread and friends in Schedule.c. However, they are
+ specific to a GranSim setup in storing more info in the TSO's statistics
+ buffer and sorting the thread queues etc.
+*/
+
+/*
+ A large portion of startThread deals with maintaining a sorted thread
+ queue, which is needed for the Priority Sparking option. Without that
+ complication the code boils down to FIFO handling.
+*/
+//@cindex insertThread
+void
+insertThread(tso, proc)
+StgTSO* tso;
+PEs proc;
+{
+ StgTSO *prev = NULL, *next = NULL;
+ nat count = 0;
+ rtsBool found = rtsFalse;
+
+ ASSERT(CurrentProc==proc);
+ ASSERT(!is_on_queue(tso,proc));
+ /* Idle proc: put the thread on the run queue
+ same for pri spark and basic version */
+ if (run_queue_hds[proc] == END_TSO_QUEUE)
+ {
+ /* too strong!
+ ASSERT((CurrentProc==MainProc &&
+ CurrentTime[MainProc]==0 &&
+ procStatus[MainProc]==Idle) ||
+ procStatus[proc]==Starting);
+ */
+ run_queue_hds[proc] = run_queue_tls[proc] = tso;
+
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
+
+ /* new_event of ContinueThread has been moved to do_the_startthread */
+
+ /* too strong!
+ ASSERT(procStatus[proc]==Idle ||
+ procStatus[proc]==Fishing ||
+ procStatus[proc]==Starting);
+ procStatus[proc] = Busy;
+ */
+ return;
+ }
+
+ if (RtsFlags.GranFlags.Light)
+ GranSimLight_insertThread(tso, proc);
+
+ /* Only for Pri Scheduling: find place where to insert tso into queue */
+ if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
+ /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
+ for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
+ (next != END_TSO_QUEUE) &&
+ !(found = tso->gran.pri >= next->gran.pri);
+ prev = next, next = next->link, count++)
+ {
+ ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
+ (prev==(StgTSO*)NULL || prev->link==next));
+ }
+
+ ASSERT(!found || next != END_TSO_QUEUE);
+ ASSERT(procStatus[proc]!=Idle);
+
+ if (found) {
+ /* found can only be rtsTrue if pri scheduling enabled */
+ ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
+ if (RtsFlags.GranFlags.GranSimStats.Global)
+ globalGranStats.non_end_add_threads++;
+ /* Add tso to ThreadQueue between prev and next */
+ tso->link = next;
+ if ( next == (StgTSO*)END_TSO_QUEUE ) {
+ run_queue_tl = tso;
+ } else {
+ /* no back link for TSO chain */
+ }
+
+ if ( prev == (StgTSO*)END_TSO_QUEUE ) {
+ /* Never add TSO as first elem of thread queue; the first */
+ /* element should be the one that is currently running -- HWL */
+ IF_DEBUG(gran,
+ belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
+ tso, tso->gran.pri, run_queue_hd, proc,
+ CurrentTime[proc]));
+ } else {
+ prev->link = tso;
+ }
+ } else { /* !found */ /* or not pri sparking! */
+ /* Add TSO to the end of the thread queue on that processor */
+ run_queue_tls[proc]->link = tso;
+ run_queue_tls[proc] = tso;
+ }
+ ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
+ CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
+ RtsFlags.GranFlags.Costs.threadqueuetime;
+
+ /* ToDo: check if this is still needed -- HWL
+ if (RtsFlags.GranFlags.DoThreadMigration)
+ ++SurplusThreads;
+
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ !(( event_type == GR_START || event_type == GR_STARTQ) &&
+ RtsFlags.GranFlags.labelling) )
+ DumpRawGranEvent(proc, creator, event_type+1, tso, node,
+ tso->gran.sparkname, spark_queue_len(proc));
+ */
+
+# if defined(GRAN_CHECK)
+ /* Check if thread queue is sorted. Only for testing, really! HWL */
+ if ( RtsFlags.GranFlags.DoPriorityScheduling &&
+ (RtsFlags.GranFlags.Debug.sortedQ) ) {
+ rtsBool sorted = rtsTrue;
+ StgTSO *prev, *next;
+
+ if (run_queue_hds[proc]==END_TSO_QUEUE ||
+ run_queue_hds[proc]->link==END_TSO_QUEUE) {
+ /* just 1 elem => ok */
+ } else {
+ /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
+ for (prev = run_queue_hds[proc]->link, next = prev->link;
+ (next != END_TSO_QUEUE) ;
+ prev = next, next = prev->link) {
+ ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
+ (prev==(StgTSO*)NULL || prev->link==next));
+ sorted = sorted &&
+ (prev->gran.pri >= next->gran.pri);
+ }
+ }
+ if (!sorted) {
+ fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
+ CurrentProc);
+ G_THREADQ(run_queue_hd,0x1);
+ }
+ }
+# endif
+}
+
+/*
+ insertThread, which is only used for GranSim Light, is similar to
+ startThread in that it adds a TSO to a thread queue. However, it assumes
+ that the thread queue is sorted by local clocks and it inserts the TSO at
+ the right place in the queue. Don't create any event, just insert.
+*/
+//@cindex GranSimLight_insertThread
+rtsBool
+GranSimLight_insertThread(tso, proc)
+StgTSO* tso;
+PEs proc;
+{
+ StgTSO *prev, *next;
+ nat count = 0;
+ rtsBool found = rtsFalse;
+
+ ASSERT(RtsFlags.GranFlags.Light);
+
+ /* In GrAnSim-Light we always have an idle `virtual' proc.
+ The semantics of the one-and-only thread queue is different here:
+ all threads in the queue are running (each on its own virtual processor);
+ the queue is only needed internally in the simulator to interleave the
+ reductions of the different processors.
+ The one-and-only thread queue is sorted by the local clocks of the TSOs.
+ */
+ ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
+ ASSERT(tso->link == END_TSO_QUEUE);
+
+ /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ (RtsFlags.GranFlags.Debug.checkLight) &&
+ (run_queue_hd->link == END_TSO_QUEUE)) {
+ DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
+ run_queue_hds[proc], (StgClosure*)NULL,
+ tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
+ // resched = rtsTrue;
+ }
+
+ /* this routine should only be used in a GrAnSim Light setup */
+ /* && CurrentProc must be 0 in GrAnSim Light setup */
+ ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
+
+ /* Idle proc; same for pri spark and basic version */
+ if (run_queue_hd==END_TSO_QUEUE)
+ {
+ run_queue_hd = run_queue_tl = tso;
+ /* MAKE_BUSY(CurrentProc); */
+ return rtsTrue;
+ }
+
+ for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
+ (next != END_TSO_QUEUE) &&
+ !(found = (tso->gran.clock < next->gran.clock));
+ prev = next, next = next->link, count++)
+ {
+ ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
+ (prev==(StgTSO*)NULL || prev->link==next));
+ }
+
+ /* found can only be rtsTrue if pri sparking enabled */
+ if (found) {
+ /* Add tso to ThreadQueue between prev and next */
+ tso->link = next;
+ if ( next == END_TSO_QUEUE ) {
+ run_queue_tls[proc] = tso;
+ } else {
+ /* no back link for TSO chain */
+ }
+
+ if ( prev == END_TSO_QUEUE ) {
+ run_queue_hds[proc] = tso;
+ } else {
+ prev->link = tso;
+ }
+ } else { /* !found */ /* or not pri sparking! */
+ /* Add TSO to the end of the thread queue on that processor */
+ run_queue_tls[proc]->link = tso;
+ run_queue_tls[proc] = tso;
+ }
+
+ if ( prev == END_TSO_QUEUE ) { /* new head of queue */
+ new_event(proc, proc, CurrentTime[proc],
+ ContinueThread,
+ tso, (StgClosure*)NULL, (rtsSpark*)NULL);
+ }
+ /*
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ !(( event_type == GR_START || event_type == GR_STARTQ) &&
+ RtsFlags.GranFlags.labelling) )
+ DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
+ tso->gran.sparkname, spark_queue_len(proc));
+ */
+ return rtsTrue;
+}
+
+/*
+ endThread is responsible for general clean-up after the thread tso has
+ finished. This includes emitting statistics into the profile etc.
+*/
+void
+endThread(StgTSO *tso, PEs proc)
+{
+ ASSERT(procStatus[proc]==Busy); // coming straight out of STG land
+ ASSERT(tso->what_next==ThreadComplete);
+ // ToDo: prune ContinueThreads for this TSO from event queue
+ DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
+
+ /* if this was the last thread on this PE then make it Idle */
+ if (run_queue_hds[proc]==END_TSO_QUEUE) {
+ procStatus[CurrentProc] = Idle;
+ }
+}
+
+//@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
+//@subsection Thread Queue routines
+
+/*
+ Check whether given tso resides on the run queue of the current processor.
+ Only used for debugging.
+*/
+
+//@cindex is_on_queue
+rtsBool
+is_on_queue (StgTSO *tso, PEs proc)
+{
+ StgTSO *t;
+ rtsBool found;
+
+ for (t=run_queue_hds[proc], found=rtsFalse;
+ t!=END_TSO_QUEUE && !(found = t==tso);
+ t=t->link)
+ /* nothing */ ;
+
+ return found;
+}
+
+/* This routine is only used for keeping a statistics of thread queue
+ lengths to evaluate the impact of priority scheduling. -- HWL
+ {spark_queue_len}vo' jInIHta'
+*/
+//@cindex thread_queue_len
+nat
+thread_queue_len(PEs proc)
+{
+ StgTSO *prev, *next;
+ nat len;
+
+ for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
+ next != END_TSO_QUEUE;
+ len++, prev = next, next = prev->link)
+ {}
+
+ return (len);
+}
+
+//@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
+//@subsection GranSim functions
+
+/* ----------------------------------------------------------------- */
+/* The main event handling functions; called from Schedule.c (schedule) */
+/* ----------------------------------------------------------------- */
+
+//@cindex do_the_globalblock
+
+void
+do_the_globalblock(rtsEvent* event)
+{
+ PEs proc = event->proc; /* proc that requested node */
+ StgTSO *tso = event->tso; /* tso that requested node */
+ StgClosure *node = event->node; /* requested, remote node */
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
+ /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+ /* GlobalBlock events only valid with GUM fetching */
+ ASSERT(RtsFlags.GranFlags.DoBulkFetching);
+
+ IF_GRAN_DEBUG(bq, // globalBlock,
+ if (IS_LOCAL_TO(PROCS(node),proc)) {
+ belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
+ tso->id, tso, node, proc);
+ });
+
+ /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
+ if ( blockFetch(tso,proc,node) != 0 )
+ return; /* node has become local by now */
+
+#if 0
+ ToDo: check whether anything has to be done at all after blockFetch -- HWL
+
+ if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
+ StgTSO* tso = run_queue_hds[proc]; /* awaken next thread */
+ if (tso != (StgTSO*)NULL) {
+ new_event(proc, proc, CurrentTime[proc],
+ ContinueThread,
+ tso, (StgClosure*)NULL, (rtsSpark*)NULL);
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
+ (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc)); // ToDo: check sparkname and spar_queue_len
+ procStatus[proc] = Busy; /* might have been fetching */
+ } else {
+ procStatus[proc] = Idle; /* no work on proc now */
+ }
+ } else { /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
+ /* other thread is already running */
+ /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL
+ new_event(proc,proc,CurrentTime[proc],
+ CONTINUETHREAD,EVENT_TSO(event),
+ (RtsFlags.GranFlags.DoBulkFetching ? closure :
+ EVENT_NODE(event)),NULL);
+ */
+ }
+#endif
+}
+
+//@cindex do_the_unblock
+
+void
+do_the_unblock(rtsEvent* event)
+{
+ PEs proc = event->proc, /* proc that requested node */
+ creator = event->creator; /* proc that requested node */
+ StgTSO* tso = event->tso; /* tso that requested node */
+ StgClosure* node = event->node; /* requested, remote node */
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
+ /* There should be no UNBLOCKs in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+ /* UnblockThread means either FetchReply has arrived or
+ a blocking queue has been awakened;
+ ToDo: check with assertions
+ ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
+ */
+ if (!RtsFlags.GranFlags.DoAsyncFetch) { /* block-on-fetch */
+ /* We count block-on-fetch as normal block time */
+ tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
+ /* Dumping now done when processing the event
+ No costs for contextswitch or thread queueing in this case
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso,
+ (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
+ */
+ /* Maybe do this in FetchReply already
+ if (procStatus[proc]==Fetching)
+ procStatus[proc] = Busy;
+ */
+ /*
+ new_event(proc, proc, CurrentTime[proc],
+ ContinueThread,
+ tso, node, (rtsSpark*)NULL);
+ */
+ } else {
+ /* Asynchr comm causes additional costs here: */
+ /* Bring the TSO from the blocked queue into the threadq */
+ }
+ /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
+ new_event(proc, proc,
+ CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
+ ResumeThread,
+ tso, node, (rtsSpark*)NULL);
+}
+
+//@cindex do_the_fetchnode
+
+void
+do_the_fetchnode(rtsEvent* event)
+{
+ PEs proc = event->proc, /* proc that holds the requested node */
+ creator = event->creator; /* proc that requested node */
+ StgTSO* tso = event->tso;
+ StgClosure* node = event->node; /* requested, remote node */
+ rtsFetchReturnCode rc;
+
+ ASSERT(CurrentProc==proc);
+ /* There should be no FETCHNODEs in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
+
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
+
+ /* ToDo: check whether this is the right place for dumping the event */
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
+
+ do {
+ rc = handleFetchRequest(node, proc, creator, tso);
+ if (rc == OutOfHeap) { /* trigger GC */
+# if defined(GRAN_CHECK) && defined(GRAN)
+ if (RtsFlags.GcFlags.giveStats)
+ fprintf(RtsFlags.GcFlags.statsFile,"***** veQ boSwI' PackNearbyGraph(node %p, tso %p (%d))\n",
+ node, tso, tso->id);
+# endif
+ barf("//// do_the_fetchnode: out of heap after handleFetchRequest; ToDo: call GarbageCollect()");
+ prepend_event(event);
+ GarbageCollect(GetRoots, rtsFalse);
+ // HWL: ToDo: check whether a ContinueThread has to be issued
+ // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
+# if 0 && defined(GRAN_CHECK) && defined(GRAN)
+ if (RtsFlags.GcFlags.giveStats) {
+ fprintf(RtsFlags.GcFlags.statsFile,"***** SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
+ Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED); ???
+ fprintf(stderr,"***** No. of packets so far: %d (total size: %d)\n",
+ globalGranStats.tot_packets, globalGranStats.tot_packet_size);
+ }
+# endif
+ event = grab_event();
+ // Hp -= PACK_HEAP_REQUIRED; // ???
+
+ /* GC knows that events are special and follows the pointer i.e. */
+ /* events are valid even if they moved. An EXIT is triggered */
+ /* if there is not enough heap after GC. */
+ }
+ } while (rc == OutOfHeap);
+}
+
+//@cindex do_the_fetchreply
+void
+do_the_fetchreply(rtsEvent* event)
+{
+ PEs proc = event->proc, /* proc that requested node */
+ creator = event->creator; /* proc that holds the requested node */
+ StgTSO* tso = event->tso;
+ StgClosure* node = event->node; /* requested, remote node */
+ StgClosure* closure=(StgClosure*)NULL;
+
+ ASSERT(CurrentProc==proc);
+ ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
+ /* There should be no FETCHREPLYs in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+
+ /* assign message unpack costs *before* dumping the event */
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
+
+ /* ToDo: check whether this is the right place for dumping the event */
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc, creator, GR_REPLY, tso, node,
+ tso->gran.sparkname, spark_queue_len(proc));
+
+ /* THIS SHOULD NEVER HAPPEN
+ If tso is in the BQ of node this means that it actually entered the
+ remote closure, due to a missing GranSimFetch at the beginning of the
+ entry code; therefore, this is actually a faked fetch, triggered from
+ within GranSimBlock;
+ since tso is both in the EVQ and the BQ for node, we have to take it out
+ of the BQ first before we can handle the FetchReply;
+ ToDo: special cases in awakenBlockedQueue, since the BQ magically moved.
+ */
+ if (tso->block_info.closure!=(StgClosure*)NULL) {
+ IF_GRAN_DEBUG(bq,
+ belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
+ tso->id, tso, node));
+ // unlink_from_bq(tso, node);
+ }
+
+ if (RtsFlags.GranFlags.DoBulkFetching) { /* bulk (packet) fetching */
+ rtsPackBuffer *buffer = (rtsPackBuffer*)node;
+ nat size = buffer->size;
+
+ /* NB: Fetch misses can't occur with GUM fetching, as */
+ /* updatable closure are turned into RBHs and therefore locked */
+ /* for other processors that try to grab them. */
+
+ closure = UnpackGraph(buffer);
+ CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
+ } else // incremental fetching
+ /* Copy or move node to CurrentProc */
+ if (fetchNode(node, creator, proc)) {
+ /* Fetch has failed i.e. node has been grabbed by another PE */
+ PEs p = where_is(node);
+ rtsTime fetchtime;
+
+ if (RtsFlags.GranFlags.GranSimStats.Global)
+ globalGranStats.fetch_misses++;
+
+ IF_GRAN_DEBUG(thunkStealing,
+ belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
+ CurrentTime[proc],node,p,creator));
+
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
+
+ /* Count fetch again !? */
+ ++(tso->gran.fetchcount);
+ tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
+
+ fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
+ RtsFlags.GranFlags.Costs.latency;
+
+ /* Chase the grabbed node */
+ new_event(p, proc, fetchtime,
+ FetchNode,
+ tso, node, (rtsSpark*)NULL);
+
+# if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
+ IF_GRAN_DEBUG(blockOnFetch,
+ BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
+
+ IF_GRAN_DEBUG(blockOnFetch_sanity,
+ tso->type |= FETCH_MASK_TSO;)
+# endif
+
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
+
+ return; /* NB: no REPLy has been processed; tso still sleeping */
+ }
+
+ /* -- Qapla'! Fetch has been successful; node is here, now */
+ ++(event->tso->gran.fetchcount);
+ event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
+
+ /* this is now done at the beginning of this routine
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
+ (RtsFlags.GranFlags.DoBulkFetching ?
+ closure :
+ event->node),
+ tso->gran.sparkname, spark_queue_len(proc));
+ */
+
+ ASSERT(OutstandingFetches[proc] > 0);
+ --OutstandingFetches[proc];
+ new_event(proc, proc, CurrentTime[proc],
+ ResumeThread,
+ event->tso, (RtsFlags.GranFlags.DoBulkFetching ?
+ closure :
+ event->node),
+ (rtsSpark*)NULL);
+}
+
+//@cindex do_the_movethread
+
+void
+do_the_movethread(rtsEvent* event) {
+ PEs proc = event->proc, /* proc that requested node */
+ creator = event->creator; /* proc that holds the requested node */
+ StgTSO* tso = event->tso;
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
+
+ ASSERT(CurrentProc==proc);
+ /* There should be no MOVETHREADs in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+ /* MOVETHREAD events should never occur without -bM */
+ ASSERT(RtsFlags.GranFlags.DoThreadMigration);
+ /* Bitmask of moved thread should be 0 */
+ ASSERT(PROCS(tso)==0);
+ ASSERT(procStatus[proc] == Fishing ||
+ RtsFlags.GranFlags.DoAsyncFetch);
+ ASSERT(OutstandingFishes[proc]>0);
+
+ /* ToDo: exact costs for unpacking the whole TSO */
+ CurrentTime[proc] += 5l * RtsFlags.GranFlags.Costs.munpacktime;
+
+ /* ToDo: check whether this is the right place for dumping the event */
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc, creator,
+ GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
+
+ // ToDo: check cost functions
+ --OutstandingFishes[proc];
+ SET_GRAN_HDR(tso, ThisPE); // adjust the bitmask for the TSO
+ insertThread(tso, proc);
+
+ if (procStatus[proc]==Fishing)
+ procStatus[proc] = Idle;
+
+ if (RtsFlags.GranFlags.GranSimStats.Global)
+ globalGranStats.tot_TSOs_migrated++;
+}
+
+//@cindex do_the_movespark
+
+void
+do_the_movespark(rtsEvent* event) {
+ PEs proc = event->proc, /* proc that requested spark */
+ creator = event->creator; /* proc that holds the requested spark */
+ StgTSO* tso = event->tso;
+ rtsSparkQ spark = event->spark;
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
+
+ ASSERT(CurrentProc==proc);
+ ASSERT(spark!=NULL);
+ ASSERT(procStatus[proc] == Fishing ||
+ RtsFlags.GranFlags.DoAsyncFetch);
+ ASSERT(OutstandingFishes[proc]>0);
+
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
+
+ /* record movement of spark only if spark profiling is turned on */
+ if (RtsFlags.GranFlags.GranSimStats.Sparks)
+ DumpRawGranEvent(proc, creator,
+ SP_ACQUIRED,
+ tso, spark->node, spark->name, spark_queue_len(proc));
+
+ /* global statistics */
+ if ( RtsFlags.GranFlags.GranSimStats.Global &&
+ !closure_SHOULD_SPARK(spark->node))
+ globalGranStats.withered_sparks++;
+ /* Not adding the spark to the spark queue would be the right */
+ /* thing here, but it also would be cheating, as this info can't be */
+ /* available in a real system. -- HWL */
+
+ --OutstandingFishes[proc];
+
+ add_to_spark_queue(spark);
+
+ IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
+ print_sparkq_stats());
+
+ /* Should we treat stolen sparks specially? Currently, we don't. */
+
+ if (procStatus[proc]==Fishing)
+ procStatus[proc] = Idle;
+
+ /* add_to_spark_queue will increase the time of the current proc. */
+ /*
+ If proc was fishing, it is Idle now with the new spark in its spark
+ pool. This means that the next time handleIdlePEs is called, a local
+ FindWork will be created on this PE to turn the spark into a thread. Of
+ course another PE might steal the spark in the meantime (that's why we
+ are using events rather than inlining all the operations in the first
+ place). */
+}
+
+/*
+ In the Constellation class version of GranSim the semantics of StarThread
+ events has changed. Now, StartThread has to perform 3 basic operations:
+ - create a new thread (previously this was done in ActivateSpark);
+ - insert the thread into the run queue of the current processor
+ - generate a new event for actually running the new thread
+ Note that the insertThread is called via createThread.
+*/
+
+//@cindex do_the_startthread
+
+void
+do_the_startthread(rtsEvent *event)
+{
+ PEs proc = event->proc; /* proc that requested node */
+ StgTSO *tso = event->tso; /* tso that requested node */
+ StgClosure *node = event->node; /* requested, remote node */
+ rtsSpark *spark = event->spark;
+ GranEventType gr_evttype;
+
+ ASSERT(CurrentProc==proc);
+ ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+ ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
+ /* if this was called via StartThread: */
+ ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
+ // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
+ /* if this was called via ResumeThread: */
+ ASSERT(event->evttype!=ResumeThread ||
+ RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc));
+
+ /* startThread may have been called from the main event handler upon
+ finding either a ResumeThread or a StartThread event; set the
+ gr_evttype (needed for writing to .gr file) accordingly */
+ // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
+
+ if ( event->evttype == StartThread ) {
+ GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
+ GR_START : GR_STARTQ;
+
+ tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
+ pushClosure(tso, node);
+
+ // ToDo: fwd info on local/global spark to thread -- HWL
+ // tso->gran.exported = spark->exported;
+ // tso->gran.locked = !spark->global;
+ tso->gran.sparkname = spark->name;
+
+ ASSERT(CurrentProc==proc);
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpGranEvent(gr_evttype,tso);
+
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
+ } else { // event->evttype == ResumeThread
+ GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
+ GR_RESUME : GR_RESUMEQ;
+
+ insertThread(tso, proc);
+
+ ASSERT(CurrentProc==proc);
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpGranEvent(gr_evttype,tso);
+ }
+
+ ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
+ procStatus[proc] = Busy;
+ /* make sure that this thread is actually run */
+ new_event(proc, proc,
+ CurrentTime[proc],
+ ContinueThread,
+ tso, node, (rtsSpark*)NULL);
+
+ /* A wee bit of statistics gathering */
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ globalGranStats.tot_add_threads++;
+ globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
+ }
+
+}
+
+//@cindex do_the_findwork
+void
+do_the_findwork(rtsEvent* event)
+{
+ PEs proc = event->proc, /* proc to search for work */
+ creator = event->creator; /* proc that requested work */
+ rtsSparkQ spark = event->spark;
+ /* ToDo: check that this size is safe -- HWL */
+#if 0
+ ToDo: check available heap
+
+ nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
+ // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
+#endif
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
+
+ /* If GUM style fishing is enabled, the contents of the spark field says
+ what to steal (spark(1) or thread(2)); */
+ ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
+
+ /* Make sure that we have enough heap for creating a new
+ thread. This is a conservative estimate of the required heap.
+ This eliminates special checks for GC around NewThread within
+ ActivateSpark. */
+
+#if 0
+ ToDo: check available heap
+
+ if (Hp + req_heap > HpLim ) {
+ IF_DEBUG(gc,
+ belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
+ GarbageCollect(GetRoots);
+ // ReallyPerformThreadGC(req_heap, rtsFalse); old -- HWL
+ Hp -= req_heap;
+ if (procStatus[CurrentProc]==Sparking)
+ procStatus[CurrentProc]=Idle;
+ return;
+ }
+#endif
+
+ if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
+ RtsFlags.GranFlags.Fishing ||
+ ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
+ (RtsFlags.GranFlags.FetchStrategy >= 2 ||
+ OutstandingFetches[proc] == 0)) )
+ {
+ rtsBool found;
+ rtsSparkQ prev, spark;
+
+ /* ToDo: check */
+ ASSERT(procStatus[proc]==Sparking ||
+ RtsFlags.GranFlags.DoAlwaysCreateThreads ||
+ RtsFlags.GranFlags.Fishing);
+
+ /* SImmoHwI' yInej! Search spark queue! */
+ /* gimme_spark (event, &found, &spark); */
+ findLocalSpark(event, &found, &spark);
+
+ if (!found) { /* pagh vumwI' */
+ /*
+ If no spark has been found this can mean 2 things:
+ 1/ The FindWork was a fish (i.e. a message sent by another PE) and
+ the spark pool of the receiver is empty
+ --> the fish has to be forwarded to another PE
+ 2/ The FindWork was local to this PE (i.e. no communication; in this
+ case creator==proc) and the spark pool of the PE is not empty
+ contains only sparks of closures that should not be sparked
+ (note: if the spark pool were empty, handleIdlePEs wouldn't have
+ generated a FindWork in the first place)
+ --> the PE has to be made idle to trigger stealing sparks the next
+ time handleIdlePEs is performed
+ */
+
+ ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
+ if (creator==proc) {
+ /* local FindWork */
+ if (procStatus[proc]==Busy) {
+ belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
+ proc, CurrentTime[proc]);
+ procStatus[proc] = Idle;
+ }
+ } else {
+ /* global FindWork i.e. a Fish */
+ ASSERT(RtsFlags.GranFlags.Fishing);
+ /* actually this generates another request from the originating PE */
+ ASSERT(OutstandingFishes[creator]>0);
+ OutstandingFishes[creator]--;
+ /* ToDo: assign costs for sending fish to proc not to creator */
+ stealSpark(creator); /* might steal from same PE; ToDo: fix */
+ ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
+ /* any assertions on state of proc possible here? */
+ }
+ } else {
+ /* DaH chu' Qu' yIchen! Now create new work! */
+ IF_GRAN_DEBUG(findWork,
+ belch("+- munching spark %p; creating thread for node %p",
+ spark, spark->node));
+ activateSpark (event, spark);
+ ASSERT(spark != (rtsSpark*)NULL);
+ spark = delete_from_sparkq (spark, proc, rtsTrue);
+ }
+
+ IF_GRAN_DEBUG(findWork,
+ belch("+- Contents of spark queues at the end of FindWork @ %lx",
+ CurrentTime[proc]);
+ print_sparkq_stats());
+
+ /* ToDo: check ; not valid if GC occurs in ActivateSpark */
+ ASSERT(!found ||
+ /* forward fish or */
+ (proc!=creator ||
+ /* local spark or */
+ (proc==creator && procStatus[proc]==Starting)) ||
+ //(!found && procStatus[proc]==Idle) ||
+ RtsFlags.GranFlags.DoAlwaysCreateThreads);
+ } else {
+ IF_GRAN_DEBUG(findWork,
+ belch("+- RTS refuses to findWork on PE %d @ %lx",
+ proc, CurrentTime[proc]);
+ belch(" procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d",
+ proc, proc_status_names[procStatus[proc]],
+ RtsFlags.GranFlags.FetchStrategy,
+ proc, OutstandingFetches[proc]));
+ }
+}
+
+//@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
+//@subsection GranSimLight routines
+
+/*
+ This code is called from the central scheduler after having rgabbed a
+ new event and is only needed for GranSim-Light. It mainly adjusts the
+ ActiveTSO so that all costs that have to be assigned from within the
+ scheduler are assigned to the right TSO. The choice of ActiveTSO depends
+ on the type of event that has been found.
+*/
+
+void
+GranSimLight_enter_system(event, ActiveTSOp)
+rtsEvent *event;
+StgTSO **ActiveTSOp;
+{
+ StgTSO *ActiveTSO = *ActiveTSOp;
+
+ ASSERT (RtsFlags.GranFlags.Light);
+
+ /* Restore local clock of the virtual processor attached to CurrentTSO.
+ All costs will be associated to the `virt. proc' on which the tso
+ is living. */
+ if (ActiveTSO != NULL) { /* already in system area */
+ ActiveTSO->gran.clock = CurrentTime[CurrentProc];
+ if (RtsFlags.GranFlags.DoFairSchedule)
+ {
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ RtsFlags.GranFlags.Debug.checkLight)
+ DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
+ }
+ }
+ switch (event->evttype)
+ {
+ case ContinueThread:
+ case FindWork: /* inaccurate this way */
+ ActiveTSO = run_queue_hd;
+ break;
+ case ResumeThread:
+ case StartThread:
+ case MoveSpark: /* has tso of virt proc in tso field of event */
+ ActiveTSO = event->tso;
+ break;
+ default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
+ event_names[event->evttype],event->evttype);
+ }
+ CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
+ if (RtsFlags.GranFlags.DoFairSchedule) {
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ RtsFlags.GranFlags.Debug.checkLight)
+ DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
+ }
+}
+
+void
+GranSimLight_leave_system(event, ActiveTSOp)
+rtsEvent *event;
+StgTSO **ActiveTSOp;
+{
+ StgTSO *ActiveTSO = *ActiveTSOp;
+
+ ASSERT(RtsFlags.GranFlags.Light);
+
+ /* Save time of `virt. proc' which was active since last getevent and
+ restore time of `virt. proc' where CurrentTSO is living on. */
+ if(RtsFlags.GranFlags.DoFairSchedule) {
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
+ DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
+ }
+ ActiveTSO->gran.clock = CurrentTime[CurrentProc];
+ ActiveTSO = (StgTSO*)NULL;
+ CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
+ if (RtsFlags.GranFlags.DoFairSchedule /* && resched */ ) {
+ // resched = rtsFalse;
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ RtsFlags.GranFlags.Debug.checkLight)
+ DumpGranEvent(GR_SCHEDULE,run_queue_hd);
+ }
+ /*
+ if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
+ (TimeOfNextEvent == 0 ||
+ TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
+ new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
+ CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
+ TimeOfNextEvent = get_time_of_next_event();
+ }
+ */
+}
+
+//@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
+//@subsection Code for Fetching Nodes
+
+/*
+ The following GrAnSim routines simulate the fetching of nodes from a
+ remote processor. We use a 1 word bitmask to indicate on which processor
+ a node is lying. Thus, moving or copying a node from one processor to
+ another just requires an appropriate change in this bitmask (using
+ @SET_GA@). Additionally, the clocks have to be updated.
+
+ A special case arises when the node that is needed by processor A has
+ been moved from a processor B to a processor C between sending out a
+ @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
+ be forwarded to C. This is simulated by issuing another FetchNode event
+ on processor C with A as creator.
+*/
+
+/* ngoqvam che' {GrAnSim}! */
+
+/* Fetch node "node" to processor "p" */
+
+//@cindex fetchNode
+
+rtsFetchReturnCode
+fetchNode(node,from,to)
+StgClosure* node;
+PEs from, to;
+{
+ /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be
+ entered! Instead, UnpackGraph is used in ReSchedule */
+ StgClosure* closure;
+
+ ASSERT(to==CurrentProc);
+ /* Should never be entered in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+ /* fetchNode should never be entered with DoBulkFetching */
+ ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
+
+ /* Now fetch the node */
+ if (!IS_LOCAL_TO(PROCS(node),from) &&
+ !IS_LOCAL_TO(PROCS(node),to) )
+ return NodeHasMoved;
+
+ if (closure_HNF(node)) /* node already in head normal form? */
+ node->header.gran.procs |= PE_NUMBER(to); /* Copy node */
+ else
+ node->header.gran.procs = PE_NUMBER(to); /* Move node */
+
+ return Ok;
+}
+
+/*
+ Process a fetch request.
+
+ Cost of sending a packet of size n = C + P*n
+ where C = packet construction constant,
+ P = cost of packing one word into a packet
+ [Should also account for multiple packets].
+*/
+
+//@cindex handleFetchRequest
+
+rtsFetchReturnCode
+handleFetchRequest(node,to,from,tso)
+StgClosure* node; // the node which is requested
+PEs to, from; // fetch request: from -> to
+StgTSO* tso; // the tso which needs the node
+{
+ ASSERT(!RtsFlags.GranFlags.Light);
+ /* ToDo: check assertion */
+ ASSERT(OutstandingFetches[from]>0);
+
+ /* probably wrong place; */
+ ASSERT(CurrentProc==to);
+
+ if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
+ { /* start tso */
+ IF_GRAN_DEBUG(thunkStealing,
+ fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n",
+ node, info_type(node), from));
+
+ if (RtsFlags.GranFlags.DoBulkFetching) {
+ nat size;
+ rtsPackBuffer *graph;
+
+ /* Create a 1-node-buffer and schedule a FETCHREPLY now */
+ graph = PackOneNode(node, tso, &size);
+ new_event(from, to, CurrentTime[to],
+ FetchReply,
+ tso, (StgClosure *)graph, (rtsSpark*)NULL);
+ } else {
+ new_event(from, to, CurrentTime[to],
+ FetchReply,
+ tso, node, (rtsSpark*)NULL);
+ }
+ IF_GRAN_DEBUG(thunkStealing,
+ belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
+ return (NodeIsLocal);
+ }
+ else if (IS_LOCAL_TO(PROCS(node), to) ) /* Is node still here? */
+ {
+ if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
+ nat size; /* (code from GUM) */
+ StgClosure* graph;
+
+ if (IS_BLACK_HOLE(node)) { /* block on BH or RBH */
+ new_event(from, to, CurrentTime[to],
+ GlobalBlock,
+ tso, node, (rtsSpark*)NULL);
+ /* Note: blockFetch is done when handling GLOBALBLOCK event;
+ make sure the TSO stays out of the run queue */
+ /* When this thread is reawoken it does the usual: it tries to
+ enter the updated node and issues a fetch if it's remote.
+ It has forgotten that it has sent a fetch already (i.e. a
+ FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
+ --OutstandingFetches[from];
+
+ IF_GRAN_DEBUG(thunkStealing,
+ belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ",
+ node, to, from));
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ globalGranStats.tot_FMBQs++;
+ }
+ return (NodeIsBH);
+ }
+
+ /* The tso requesting the node is blocked and cannot be on a run queue */
+ ASSERT(!is_on_queue(tso, from));
+
+ // ToDo: check whether graph is ever used as an rtsPackBuffer!!
+ if ((graph = (StgClosure *)PackNearbyGraph(node, tso, &size, 0)) == NULL)
+ return (OutOfHeap); /* out of heap */
+
+ /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
+ /* Send a reply to the originator */
+ /* ToDo: Replace that by software costs for doing graph packing! */
+ CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
+
+ new_event(from, to,
+ CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
+ FetchReply,
+ tso, (StgClosure *)graph, (rtsSpark*)NULL);
+
+ CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
+ return (Ok);
+ } else { /* incremental (single closure) fetching */
+ /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
+ /* Send a reply to the originator */
+ CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
+
+ new_event(from, to,
+ CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
+ FetchReply,
+ tso, node, (rtsSpark*)NULL);
+
+ CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
+ return (Ok);
+ }
+ }
+ else /* Qu'vatlh! node has been grabbed by another proc => forward */
+ {
+ PEs node_loc = where_is(node);
+ rtsTime fetchtime;
+
+ IF_GRAN_DEBUG(thunkStealing,
+ belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
+ node,node_loc,to,from,CurrentTime[to]));
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ globalGranStats.fetch_misses++;
+ }
+
+ /* Prepare FORWARD message to proc p_new */
+ CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
+
+ fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
+ RtsFlags.GranFlags.Costs.latency;
+
+ new_event(node_loc, from, fetchtime,
+ FetchNode,
+ tso, node, (rtsSpark*)NULL);
+
+ CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
+
+ return (NodeHasMoved);
+ }
+}
+
+/*
+ blockFetch blocks a BlockedFetch node on some kind of black hole.
+
+ Taken from gum/HLComms.lc. [find a better place for that ?] -- HWL
+
+ {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
+ create @FMBQ@'s (FetchMe blocking queues) to cope with global
+ blocking. Instead, non-local TSO are put into the BQ in the same way as
+ local TSOs. However, we have to check if a TSO is local or global in
+ order to account for the latencies involved and for keeping track of the
+ number of fetches that are really going on.
+*/
+
+//@cindex blockFetch
+
+rtsFetchReturnCode
+blockFetch(tso, proc, bh)
+StgTSO* tso; /* TSO which gets blocked */
+PEs proc; /* PE where that tso was running */
+StgClosure* bh; /* closure to block on (BH, RBH, BQ) */
+{
+ StgInfoTable *info;
+
+ IF_GRAN_DEBUG(bq,
+ fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n",
+ tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
+
+ if (!IS_BLACK_HOLE(bh)) { /* catches BHs and RBHs */
+ IF_GRAN_DEBUG(bq,
+ fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n",
+ bh, info_type(bh), tso, tso->id, proc));
+
+ /* No BH anymore => immediately unblock tso */
+ new_event(proc, proc, CurrentTime[proc],
+ UnblockThread,
+ tso, bh, (rtsSpark*)NULL);
+
+ /* Is this always a REPLY to a FETCH in the profile ? */
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
+ return (NodeIsNoBH);
+ }
+
+ /* DaH {BQ}Daq Qu' Suq 'e' wISov!
+ Now we know that we have to put the tso into the BQ.
+ 2 cases: If block-on-fetch, tso is at head of threadq =>
+ => take it out of threadq and into BQ
+ If reschedule-on-fetch, tso is only pointed to be event
+ => just put it into BQ
+
+ ngoq ngo'!!
+ if (!RtsFlags.GranFlags.DoAsyncFetch) {
+ GranSimBlock(tso, proc, bh);
+ } else {
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
+ ++(tso->gran.blockcount);
+ tso->gran.blockedat = CurrentTime[proc];
+ }
+ */
+
+ /* after scheduling the GlobalBlock event the TSO is not put into the
+ run queue again; it is only pointed to via the event we are
+ processing now; in GranSim 4.xx there is no difference between
+ synchr and asynchr comm here */
+ ASSERT(!is_on_queue(tso, proc));
+ ASSERT(tso->link == END_TSO_QUEUE);
+
+ GranSimBlock(tso, proc, bh); /* GranSim statistics gathering */
+
+ /* Now, put tso into BQ (similar to blocking entry codes) */
+ info = get_itbl(bh);
+ switch (info -> type) {
+ case RBH:
+ case BLACKHOLE:
+ case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
+ case SE_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
+ case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
+ /* basically an inlined version of BLACKHOLE_entry -- HWL */
+ /* Change the BLACKHOLE into a BLACKHOLE_BQ */
+ ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
+ /* Put ourselves on the blocking queue for this black hole */
+ // tso->link=END_TSO_QUEUE; not necessary; see assertion above
+ ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
+ tso->block_info.closure = bh;
+ recordMutable((StgMutClosure *)bh);
+ break;
+
+ case BLACKHOLE_BQ:
+ /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
+ tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue);
+ ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
+ recordMutable((StgMutClosure *)bh);
+
+# if 0 && defined(GC_MUT_REQUIRED)
+ ToDo: check whether recordMutable is necessary -- HWL
+ /*
+ * If we modify a black hole in the old generation, we have to make
+ * sure it goes on the mutables list
+ */
+
+ if (bh <= StorageMgrInfo.OldLim) {
+ MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
+ StorageMgrInfo.OldMutables = bh;
+ } else
+ MUT_LINK(bh) = MUT_NOT_LINKED;
+# endif
+ break;
+
+ case FETCH_ME_BQ:
+ barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
+ bh, tso, tso->id);
+
+ default:
+ {
+ G_PRINT_NODE(bh);
+ barf("Qagh: thought %p was a black hole (IP %p (%s))",
+ bh, info, info_type(bh));
+ }
+ }
+ return (Ok);
+}
+
+
+//@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
+//@subsection Idle PEs
+
+/*
+ Export work to idle PEs. This function is called from @ReSchedule@
+ before dispatching on the current event. @HandleIdlePEs@ iterates over
+ all PEs, trying to get work for idle PEs. Note, that this is a
+ simplification compared to GUM's fishing model. We try to compensate for
+ that by making the cost for stealing work dependent on the number of
+ idle processors and thereby on the probability with which a randomly
+ sent fish would find work.
+*/
+
+//@cindex handleIdlePEs
+
+void
+handleIdlePEs(void)
+{
+ PEs p;
+
+ IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
+
+ /* Should never be entered in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+
+ /* Could check whether there are idle PEs if it's a cheap check */
+ for (p = 0; p < RtsFlags.GranFlags.proc; p++)
+ if (procStatus[p]==Idle) /* && IS_SPARKING(p) && IS_STARTING(p) */
+ /* First look for local work i.e. examine local spark pool! */
+ if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
+ new_event(p, p, CurrentTime[p],
+ FindWork,
+ (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
+ procStatus[p] = Sparking;
+ } else if ((RtsFlags.GranFlags.maxFishes==0 ||
+ OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
+
+ /* If no local work then try to get remote work!
+ Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
+ if (RtsFlags.GranFlags.DoStealThreadsFirst &&
+ (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
+ {
+ if (SurplusThreads > 0l) /* Steal a thread */
+ stealThread(p);
+
+ if (procStatus[p]!=Idle)
+ break;
+ }
+
+ if (SparksAvail > 0 &&
+ (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
+ stealSpark(p);
+
+ if (SurplusThreads > 0 &&
+ (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
+ stealThread(p);
+ }
+}
+
+/*
+ Steal a spark and schedule moving it to proc. We want to look at PEs in
+ clock order -- most retarded first. Currently sparks are only stolen
+ from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
+ this should be changed to first steal from the former then from the
+ latter.
+
+ We model a sort of fishing mechanism by counting the number of sparks
+ and threads we are currently stealing. */
+
+/*
+ Return a random nat value in the intervall [from, to)
+*/
+static nat
+natRandom(from, to)
+nat from, to;
+{
+ nat r, d;
+
+ ASSERT(from<=to);
+ d = to - from;
+ /* random returns a value in [0, RAND_MAX] */
+ r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
+ r = (r==to) ? from : r;
+ ASSERT(from<=r && (r<to || from==to));
+ return r;
+}
+
+/*
+ Find any PE other than proc. Used for GUM style fishing only.
+*/
+static PEs
+findRandomPE (proc)
+PEs proc;
+{
+ nat p;
+
+ ASSERT(RtsFlags.GranFlags.Fishing);
+ if (RtsFlags.GranFlags.RandomSteal) {
+ p = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
+ } else {
+ p = 0;
+ }
+ IF_GRAN_DEBUG(randomSteal,
+ belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
+ p, proc));
+
+ return (PEs)p;
+}
+
+/*
+ Magic code for stealing sparks/threads makes use of global knowledge on
+ spark queues.
+*/
+static void
+sortPEsByTime (proc, pes_by_time, firstp, np)
+PEs proc;
+PEs *pes_by_time;
+nat *firstp, *np;
+{
+ PEs p, temp, n, i, j;
+ nat first, upb, r=0, q=0;
+
+ ASSERT(!RtsFlags.GranFlags.Fishing);
+
+#if 0
+ upb = RtsFlags.GranFlags.proc; /* full range of PEs */
+
+ if (RtsFlags.GranFlags.RandomSteal) {
+ r = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
+ } else {
+ r = 0;
+ }
+#endif
+
+ /* pes_by_time shall contain processors from which we may steal sparks */
+ for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
+ if ((proc != p) && // not the current proc
+ (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
+ (CurrentTime[p] <= CurrentTime[CurrentProc]))
+ pes_by_time[n++] = p;
+
+ /* sort pes_by_time */
+ for(i=0; i < n; ++i)
+ for(j=i+1; j < n; ++j)
+ if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
+ rtsTime temp = pes_by_time[i];
+ pes_by_time[i] = pes_by_time[j];
+ pes_by_time[j] = temp;
+ }
+
+ /* Choose random processor to steal spark from; first look at processors */
+ /* that are earlier than the current one (i.e. proc) */
+ for(first=0;
+ (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
+ ++first)
+ /* nothing */ ;
+
+ /* if the assertion below is true we can get rid of first */
+ /* ASSERT(first==n); */
+ /* ToDo: check if first is really needed; find cleaner solution */
+
+ *firstp = first;
+ *np = n;
+}
+
+/*
+ Steal a spark (piece of work) from any processor and bring it to proc.
+*/
+//@cindex stealSpark
+static rtsBool
+stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
+
+/*
+ Steal a thread from any processor and bring it to proc i.e. thread migration
+*/
+//@cindex stealThread
+static rtsBool
+stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
+
+/*
+ Steal a spark or a thread and schedule moving it to proc.
+*/
+//@cindex stealSomething
+static rtsBool
+stealSomething(proc, steal_spark, steal_thread)
+PEs proc; // PE that needs work (stealer)
+rtsBool steal_spark, steal_thread; // should a spark and/or thread be stolen
+{
+ PEs p;
+ rtsTime fish_arrival_time;
+ rtsSpark *spark, *prev, *next;
+ rtsBool stolen = rtsFalse;
+
+ ASSERT(steal_spark || steal_thread);
+
+ /* Should never be entered in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+ ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
+
+ if (!RtsFlags.GranFlags.Fishing) {
+ // ToDo: check if stealing threads is prefered over stealing sparks
+ if (steal_spark) {
+ if (stealSparkMagic(proc))
+ return rtsTrue;
+ else // no spark found
+ if (steal_thread)
+ return stealThreadMagic(proc);
+ else // no thread found
+ return rtsFalse;
+ } else { // ASSERT(steal_thread);
+ return stealThreadMagic(proc);
+ }
+ barf("stealSomething: never reached");
+ }
+
+ /* The rest of this function does GUM style fishing */
+
+ p = findRandomPE(proc); /* find a random PE other than proc */
+
+ /* Message packing costs for sending a Fish; qeq jabbI'ID */
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
+
+ /* use another GranEvent for requesting a thread? */
+ if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
+ DumpRawGranEvent(p, proc, SP_REQUESTED,
+ (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
+
+ /* time of the fish arrival on the remote PE */
+ fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
+
+ /* Phps use an own Fish event for that? */
+ /* The contents of the spark component is a HACK:
+ 1 means give me a spark;
+ 2 means give me a thread
+ 0 means give me nothing (this should never happen)
+ */
+ new_event(p, proc, fish_arrival_time,
+ FindWork,
+ (StgTSO*)NULL, (StgClosure*)NULL,
+ (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
+
+ ++OutstandingFishes[proc];
+ /* only with Async fetching? */
+ if (procStatus[proc]==Idle)
+ procStatus[proc]=Fishing;
+
+ /* time needed to clean up buffers etc after sending a message */
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
+
+ /* If GUM style fishing stealing always succeeds because it only consists
+ of sending out a fish; of course, when the fish may return
+ empty-handed! */
+ return rtsTrue;
+}
+
+/*
+ This version of stealing a spark makes use of the global info on all
+ spark pools etc which is not available in a real parallel system.
+ This could be extended to test e.g. the impact of perfect load information.
+*/
+//@cindex stealSparkMagic
+static rtsBool
+stealSparkMagic(proc)
+PEs proc;
+{
+ PEs p=0, i=0, j=0, n=0, first, upb;
+ rtsSpark *spark=NULL, *next;
+ PEs pes_by_time[MAX_PROC];
+ rtsBool stolen = rtsFalse;
+ rtsTime stealtime;
+
+ /* Should never be entered in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+
+ sortPEsByTime(proc, pes_by_time, &first, &n);
+
+ while (!stolen && n>0) {
+ upb = (first==0) ? n : first;
+ i = natRandom(0,upb); /* choose a random eligible PE */
+ p = pes_by_time[i];
+
+ IF_GRAN_DEBUG(randomSteal,
+ belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
+ p, proc));
+
+ ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
+
+ /* Now go through rtsSparkQ and steal the first eligible spark */
+
+ spark = pending_sparks_hds[p];
+ while (!stolen && spark != (rtsSpark*)NULL)
+ {
+ /* NB: no prev pointer is needed here because all sparks that are not
+ chosen are pruned
+ */
+ if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
+ spark->next==(rtsSpark*)NULL)
+ {
+ /* Be social! Don't steal the only spark of an idle processor
+ not {spark} neH yInIH !! */
+ break; /* next PE */
+ }
+ else if (closure_SHOULD_SPARK(spark->node))
+ {
+ /* Don't Steal local sparks;
+ ToDo: optionally prefer local over global sparks
+ if (!spark->global) {
+ prev=spark;
+ continue; next spark
+ }
+ */
+ /* found a spark! */
+
+ /* Prepare message for sending spark */
+ CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
+
+ if (RtsFlags.GranFlags.GranSimStats.Sparks)
+ DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
+ (StgTSO*)NULL, spark->node,
+ spark->name, spark_queue_len(p));
+
+ stealtime = (CurrentTime[p] > CurrentTime[proc] ?
+ CurrentTime[p] :
+ CurrentTime[proc])
+ + sparkStealTime();
+
+ new_event(proc, p /* CurrentProc */, stealtime,
+ MoveSpark,
+ (StgTSO*)NULL, spark->node, spark);
+
+ stolen = rtsTrue;
+ ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
+ if (procStatus[proc]==Idle)
+ procStatus[proc] = Fishing;
+ ++(spark->global); /* record that this is a global spark */
+ ASSERT(SparksAvail>0);
+ --SparksAvail; /* on-the-fly sparks are not available */
+ next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
+ CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
+ }
+ else /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
+ {
+ IF_GRAN_DEBUG(checkSparkQ,
+ belch("^^ pruning spark %p (node %p) in stealSparkMagic",
+ spark, spark->node));
+
+ /* if the spark points to a node that should not be sparked,
+ prune the spark queue at this point */
+ if (RtsFlags.GranFlags.GranSimStats.Sparks)
+ DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
+ (StgTSO*)NULL, spark->node,
+ spark->name, spark_queue_len(p));
+ if (RtsFlags.GranFlags.GranSimStats.Global)
+ globalGranStats.pruned_sparks++;
+
+ ASSERT(SparksAvail>0);
+ --SparksAvail;
+ spark = delete_from_sparkq(spark, p, rtsTrue);
+ }
+ /* unlink spark (may have been freed!) from sparkq;
+ if (prev == NULL) // spark was head of spark queue
+ pending_sparks_hds[p] = spark->next;
+ else
+ prev->next = spark->next;
+ if (spark->next == NULL)
+ pending_sparks_tls[p] = prev;
+ else
+ next->prev = prev;
+ */
+ } /* while ... iterating over sparkq */
+
+ /* ToDo: assert that PE p still has work left after stealing the spark */
+
+ if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
+ ASSERT(pes_by_time[i]==p);
+
+ /* remove p from the list (at pos i) */
+ for (j=i; j+1<n; j++)
+ pes_by_time[j] = pes_by_time[j+1];
+ n--;
+
+ /* update index to first proc which is later (or equal) than proc */
+ for ( ;
+ (first>0) &&
+ (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
+ first--)
+ /* nothing */ ;
+ }
+ } /* while ... iterating over PEs in pes_by_time */
+
+ IF_GRAN_DEBUG(randomSteal,
+ if (stolen)
+ belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
+ spark, spark->node, proc, p,
+ SparksAvail, idlers());
+ else
+ belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
+ proc, SparksAvail, idlers()));
+
+ if (RtsFlags.GranFlags.GranSimStats.Global &&
+ stolen && (i!=0)) { /* only for statistics */
+ globalGranStats.rs_sp_count++;
+ globalGranStats.ntimes_total += n;
+ globalGranStats.fl_total += first;
+ globalGranStats.no_of_steals++;
+ }
+
+ return stolen;
+}
+
+/*
+ The old stealThread code, which makes use of global info and does not
+ send out fishes.
+ NB: most of this is the same as in stealSparkMagic;
+ only the pieces specific to processing thread queues are different;
+ long live polymorphism!
+*/
+
+//@cindex stealThreadMagic
+static rtsBool
+stealThreadMagic(proc)
+PEs proc;
+{
+ PEs p=0, i=0, j=0, n=0, first, upb;
+ StgTSO *tso=END_TSO_QUEUE;
+ PEs pes_by_time[MAX_PROC];
+ rtsBool stolen = rtsFalse;
+ rtsTime stealtime;
+
+ /* Should never be entered in GrAnSim Light setup */
+ ASSERT(!RtsFlags.GranFlags.Light);
+
+ sortPEsByTime(proc, pes_by_time, &first, &n);
+
+ while (!stolen && n>0) {
+ upb = (first==0) ? n : first;
+ i = natRandom(0,upb); /* choose a random eligible PE */
+ p = pes_by_time[i];
+
+ IF_GRAN_DEBUG(randomSteal,
+ belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
+ p, proc));
+
+ /* Steal the first exportable thread in the runnable queue but
+ never steal the first in the queue for social reasons;
+ not Qu' wa'DIch yInIH !!
+ */
+ /* Would be better to search through queue and have options which of
+ the threads to pick when stealing */
+ if (run_queue_hds[p] == END_TSO_QUEUE) {
+ IF_GRAN_DEBUG(randomSteal,
+ belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)",
+ p, proc));
+ } else {
+ tso = run_queue_hds[p]->link; /* tso is *2nd* thread in thread queue */
+ /* Found one */
+ stolen = rtsTrue;
+
+ /* update links in queue */
+ run_queue_hds[p]->link = tso->link;
+ if (run_queue_tls[p] == tso)
+ run_queue_tls[p] = run_queue_hds[p];
+
+ /* ToDo: Turn magic constants into params */
+
+ CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
+
+ stealtime = (CurrentTime[p] > CurrentTime[proc] ?
+ CurrentTime[p] :
+ CurrentTime[proc])
+ + sparkStealTime()
+ + 4l * RtsFlags.GranFlags.Costs.additional_latency
+ + 5l * RtsFlags.GranFlags.Costs.munpacktime;
+
+ /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
+ SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */);
+
+ /* Move from one queue to another */
+ new_event(proc, p, stealtime,
+ MoveThread,
+ tso, (StgClosure*)NULL, (rtsSpark*)NULL);
+
+ /* MAKE_BUSY(proc); not yet; only when thread is in threadq */
+ ++OutstandingFishes[proc];
+ if (procStatus[proc])
+ procStatus[proc] = Fishing;
+ --SurplusThreads;
+
+ if(RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(p, proc,
+ GR_STEALING,
+ tso, (StgClosure*)NULL, (StgInt)0, 0);
+
+ /* costs for tidying up buffer after having sent it */
+ CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
+ }
+
+ /* ToDo: assert that PE p still has work left after stealing the spark */
+
+ if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
+ ASSERT(pes_by_time[i]==p);
+
+ /* remove p from the list (at pos i) */
+ for (j=i; j+1<n; j++)
+ pes_by_time[j] = pes_by_time[j+1];
+ n--;
+
+ /* update index to first proc which is later (or equal) than proc */
+ for ( ;
+ (first>0) &&
+ (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
+ first--)
+ /* nothing */ ;
+ }
+ } /* while ... iterating over PEs in pes_by_time */
+
+ IF_GRAN_DEBUG(randomSteal,
+ if (stolen)
+ belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
+ tso->id, tso, proc, p,
+ SparksAvail, idlers());
+ else
+ belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
+ proc, SparksAvail, idlers()));
+
+ if (RtsFlags.GranFlags.GranSimStats.Global &&
+ stolen && (i!=0)) { /* only for statistics */
+ /* ToDo: more statistics on avg thread queue lenght etc */
+ globalGranStats.rs_t_count++;
+ globalGranStats.no_of_migrates++;
+ }
+
+ return stolen;
+}
+
+//@cindex sparkStealTime
+static rtsTime
+sparkStealTime(void)
+{
+ double fishdelay, sparkdelay, latencydelay;
+ fishdelay = (double)RtsFlags.GranFlags.proc/2;
+ sparkdelay = fishdelay -
+ ((fishdelay-1.0)/(double)(RtsFlags.GranFlags.proc-1))*((double)idlers());
+ latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
+
+ return((rtsTime)latencydelay);
+}
+
+//@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
+//@subsection Routines directly called from Haskell world
+/*
+The @GranSim...@ routines in here are directly called via macros from the
+threaded world.
+
+First some auxiliary routines.
+*/
+
+/* Take the current thread off the thread queue and thereby activate the
+ next thread. It's assumed that the next ReSchedule after this uses
+ NEW_THREAD as param.
+ This fct is called from GranSimBlock and GranSimFetch
+*/
+
+//@cindex ActivateNextThread
+
+void
+ActivateNextThread (proc)
+PEs proc;
+{
+ StgTSO *t;
+ /*
+ This routine is entered either via GranSimFetch or via GranSimBlock.
+ It has to prepare the CurrentTSO for being blocked and update the
+ run queue and other statistics on PE proc. The actual enqueuing to the
+ blocking queue (if coming from GranSimBlock) is done in the entry code
+ of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
+ */
+ /* ToDo: add assertions here!! */
+ //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
+
+ // Only necessary if the running thread is at front of the queue
+ // run_queue_hds[proc] = run_queue_hds[proc]->link;
+ ASSERT(CurrentProc==proc);
+ ASSERT(!is_on_queue(CurrentTSO,proc));
+ if (run_queue_hds[proc]==END_TSO_QUEUE) {
+ /* NB: this routine is only entered with asynchr comm (see assertion) */
+ procStatus[proc] = Idle;
+ } else {
+ /* ToDo: check cost assignment */
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
+ if (RtsFlags.GranFlags.GranSimStats.Full &&
+ (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight))
+ /* right flag !?? ^^^ */
+ DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
+ (StgClosure*)NULL, (StgInt)0, 0);
+ }
+}
+
+/*
+ The following GranSim fcts are stg-called from the threaded world.
+*/
+
+/* Called from HP_CHK and friends (see StgMacros.h) */
+//@cindex GranSimAllocate
+void
+GranSimAllocate(n)
+StgInt n;
+{
+ CurrentTSO->gran.allocs += n;
+ ++(CurrentTSO->gran.basicblocks);
+
+ if (RtsFlags.GranFlags.GranSimStats.Heap) {
+ DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
+ (StgClosure*)NULL, (StgInt)0, n);
+ }
+
+ CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
+}
+
+/*
+ Subtract the values added above, if a heap check fails and
+ so has to be redone.
+*/
+//@cindex GranSimUnallocate
+void
+GranSimUnallocate(n)
+StgInt n;
+{
+ CurrentTSO->gran.allocs -= n;
+ --(CurrentTSO->gran.basicblocks);
+
+ CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
+ CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
+}
+
+/* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
+//@cindex GranSimExec
+void
+GranSimExec(ariths,branches,loads,stores,floats)
+StgWord ariths,branches,loads,stores,floats;
+{
+ StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths +
+ RtsFlags.GranFlags.Costs.branch_cost*branches +
+ RtsFlags.GranFlags.Costs.load_cost * loads +
+ RtsFlags.GranFlags.Costs.store_cost*stores +
+ RtsFlags.GranFlags.Costs.float_cost*floats;
+
+ CurrentTSO->gran.exectime += cost;
+ CurrentTime[CurrentProc] += cost;
+}
+
+/*
+ Fetch the node if it isn't local
+ -- result indicates whether fetch has been done.
+
+ This is GRIP-style single item fetching.
+*/
+
+//@cindex GranSimFetch
+StgInt
+GranSimFetch(node /* , liveness_mask */ )
+StgClosure *node;
+/* StgInt liveness_mask; */
+{
+ /* reset the return value (to be checked within STG land) */
+ NeedToReSchedule = rtsFalse;
+
+ if (RtsFlags.GranFlags.Light) {
+ /* Always reschedule in GrAnSim-Light to prevent one TSO from
+ running off too far
+ new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
+ ContinueThread,CurrentTSO,node,NULL);
+ */
+ return(0);
+ }
+
+ /* Faking an RBH closure:
+ If the bitmask of the closure is 0 then this node is a fake RBH;
+ */
+ if (node->header.gran.procs == Nowhere) {
+ IF_GRAN_DEBUG(bq,
+ belch("## Found fake RBH (node %p); delaying TSO %d (%p)",
+ node, CurrentTSO->id, CurrentTSO));
+
+ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
+ ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
+
+ /* Rescheduling (GranSim internal) is necessary */
+ NeedToReSchedule = rtsTrue;
+
+ return(1);
+ }
+
+ /* Note: once a node has been fetched, this test will be passed */
+ if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
+ {
+ PEs p = where_is(node);
+ rtsTime fetchtime;
+
+ IF_GRAN_DEBUG(thunkStealing,
+ if (p==CurrentProc)
+ belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
+
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
+ /* NB: Fetch is counted on arrival (FetchReply) */
+
+ fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
+ RtsFlags.GranFlags.Costs.latency;
+
+ new_event(p, CurrentProc, fetchtime,
+ FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
+
+ if (fetchtime<TimeOfNextEvent)
+ TimeOfNextEvent = fetchtime;
+
+ /* About to block */
+ CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
+
+ ++OutstandingFetches[CurrentProc];
+
+ if (RtsFlags.GranFlags.DoAsyncFetch)
+ /* if asynchr comm is turned on, activate the next thread in the q */
+ ActivateNextThread(CurrentProc);
+ else
+ procStatus[CurrentProc] = Fetching;
+
+#if 0
+ /* ToDo: nuke the entire if (anything special for fair schedule?) */
+ if (RtsFlags.GranFlags.DoAsyncFetch)
+ {
+ /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
+ if(!RtsFlags.GranFlags.DoFairSchedule)
+ {
+ /* now done in do_the_fetchnode
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
+ node, (StgInt)0, 0);
+ */
+ ActivateNextThread(CurrentProc);
+
+# if 0 && defined(GRAN_CHECK)
+ if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
+ if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
+ fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
+ CurrentTSO,CurrentTime[CurrentProc]);
+ stg_exit(EXIT_FAILURE);
+ } else {
+ TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
+ }
+ }
+# endif
+ CurrentTSO->link = END_TSO_QUEUE;
+ /* CurrentTSO = END_TSO_QUEUE; */
+
+ /* CurrentTSO is pointed to by the FetchNode event; it is
+ on no run queue any more */
+ } else { /* fair scheduling currently not supported -- HWL */
+ barf("Asynchr communication is not yet compatible with fair scheduling\n");
+ }
+ } else { /* !RtsFlags.GranFlags.DoAsyncFetch */
+ procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
+ /* now done in do_the_fetchnode
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(CurrentProc, p,
+ GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
+ */
+ IF_GRAN_DEBUG(blockOnFetch,
+ BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
+ }
+#endif /* 0 */
+
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
+
+ /* Rescheduling (GranSim internal) is necessary */
+ NeedToReSchedule = rtsTrue;
+
+ return(1);
+ }
+ return(0);
+}
+
+//@cindex GranSimSpark
+void
+GranSimSpark(local,node)
+StgInt local;
+StgClosure *node;
+{
+ /* ++SparksAvail; Nope; do that in add_to_spark_queue */
+ if (RtsFlags.GranFlags.GranSimStats.Sparks)
+ DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
+ END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
+
+ /* Force the PE to take notice of the spark */
+ if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
+ new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
+ FindWork,
+ END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
+ if (CurrentTime[CurrentProc]<TimeOfNextEvent)
+ TimeOfNextEvent = CurrentTime[CurrentProc];
+ }
+
+ if(local)
+ ++CurrentTSO->gran.localsparks;
+ else
+ ++CurrentTSO->gran.globalsparks;
+}
+
+//@cindex GranSimSparkAt
+void
+GranSimSparkAt(spark,where,identifier)
+rtsSpark *spark;
+StgClosure *where; /* This should be a node; alternatively could be a GA */
+StgInt identifier;
+{
+ PEs p = where_is(where);
+ GranSimSparkAtAbs(spark,p,identifier);
+}
+
+//@cindex GranSimSparkAtAbs
+void
+GranSimSparkAtAbs(spark,proc,identifier)
+rtsSpark *spark;
+PEs proc;
+StgInt identifier;
+{
+ rtsTime exporttime;
+
+ if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
+ return; /* turned a spark into a NULL. */
+
+ /* ++SparksAvail; Nope; do that in add_to_spark_queue */
+ if(RtsFlags.GranFlags.GranSimStats.Sparks)
+ DumpRawGranEvent(proc,0,SP_SPARKAT,
+ END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
+
+ if (proc!=CurrentProc) {
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
+ exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]?
+ CurrentTime[proc]: CurrentTime[CurrentProc])
+ + RtsFlags.GranFlags.Costs.latency;
+ } else {
+ exporttime = CurrentTime[CurrentProc];
+ }
+
+ if ( RtsFlags.GranFlags.Light )
+ /* Need CurrentTSO in event field to associate costs with creating
+ spark even in a GrAnSim Light setup */
+ new_event(proc, CurrentProc, exporttime,
+ MoveSpark,
+ CurrentTSO, spark->node, spark);
+ else
+ new_event(proc, CurrentProc, exporttime,
+ MoveSpark, (StgTSO*)NULL, spark->node, spark);
+ /* Bit of a hack to treat placed sparks the same as stolen sparks */
+ ++OutstandingFishes[proc];
+
+ /* Force the PE to take notice of the spark (FINDWORK is put after a
+ MoveSpark into the sparkq!) */
+ if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
+ new_event(CurrentProc,CurrentProc,exporttime+1,
+ FindWork,
+ (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
+ }
+
+ if (exporttime<TimeOfNextEvent)
+ TimeOfNextEvent = exporttime;
+
+ if (proc!=CurrentProc) {
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
+ ++CurrentTSO->gran.globalsparks;
+ } else {
+ ++CurrentTSO->gran.localsparks;
+ }
+}
+
+/*
+ This function handles local and global blocking. It's called either
+ from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
+ trying to fetch an BH or RBH
+*/
+
+//@cindex GranSimBlock
+void
+GranSimBlock(tso, proc, node)
+StgTSO *tso;
+PEs proc;
+StgClosure *node;
+{
+ PEs node_proc = where_is(node),
+ tso_proc = where_is((StgClosure *)tso);
+
+ ASSERT(tso_proc==CurrentProc);
+ // ASSERT(node_proc==CurrentProc);
+ IF_GRAN_DEBUG(bq,
+ if (node_proc!=CurrentProc)
+ belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
+ tso->id, tso, tso_proc, node, node_proc));
+ ASSERT(tso->link==END_TSO_QUEUE);
+ ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
+ //ASSERT(tso==run_queue_hds[proc]);
+
+ IF_DEBUG(gran,
+ belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
+ tso->id, tso, proc, node, CurrentTime[proc]));
+
+
+ /* THIS SHOULD NEVER HAPPEN!
+ If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
+ we have missed a GranSimFetch before entering this closure;
+ we hack around it for now, faking a FetchNode;
+ because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
+ tso will be blocked on this closure until the FetchReply occurs.
+
+ ngoq Dogh!
+
+ if (node_proc!=CurrentProc) {
+ StgInt ret;
+ ret = GranSimFetch(node);
+ IF_GRAN_DEBUG(bq,
+ if (ret)
+ belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
+ node, node_proc, CurrentProc););
+ return;
+ }
+ */
+
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
+
+ ++(tso->gran.blockcount);
+ /* Distinction between local and global block is made in blockFetch */
+ tso->gran.blockedat = CurrentTime[proc];
+
+ CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
+ ActivateNextThread(proc);
+ /* tso->link = END_TSO_QUEUE; not really necessary; only for testing */
+}
+
+#endif /* GRAN */
+
+//@node Index, , Dumping routines, GranSim specific code
+//@subsection Index
+
+//@index
+//* ActivateNextThread:: @cindex\s-+ActivateNextThread
+//* CurrentProc:: @cindex\s-+CurrentProc
+//* CurrentTime:: @cindex\s-+CurrentTime
+//* GranSimAllocate:: @cindex\s-+GranSimAllocate
+//* GranSimBlock:: @cindex\s-+GranSimBlock
+//* GranSimExec:: @cindex\s-+GranSimExec
+//* GranSimFetch:: @cindex\s-+GranSimFetch
+//* GranSimLight_insertThread:: @cindex\s-+GranSimLight_insertThread
+//* GranSimSpark:: @cindex\s-+GranSimSpark
+//* GranSimSparkAt:: @cindex\s-+GranSimSparkAt
+//* GranSimSparkAtAbs:: @cindex\s-+GranSimSparkAtAbs
+//* GranSimUnallocate:: @cindex\s-+GranSimUnallocate
+//* any_idle:: @cindex\s-+any_idle
+//* blockFetch:: @cindex\s-+blockFetch
+//* do_the_fetchnode:: @cindex\s-+do_the_fetchnode
+//* do_the_fetchreply:: @cindex\s-+do_the_fetchreply
+//* do_the_findwork:: @cindex\s-+do_the_findwork
+//* do_the_globalblock:: @cindex\s-+do_the_globalblock
+//* do_the_movespark:: @cindex\s-+do_the_movespark
+//* do_the_movethread:: @cindex\s-+do_the_movethread
+//* do_the_startthread:: @cindex\s-+do_the_startthread
+//* do_the_unblock:: @cindex\s-+do_the_unblock
+//* fetchNode:: @cindex\s-+fetchNode
+//* ga_to_proc:: @cindex\s-+ga_to_proc
+//* get_next_event:: @cindex\s-+get_next_event
+//* get_time_of_next_event:: @cindex\s-+get_time_of_next_event
+//* grab_event:: @cindex\s-+grab_event
+//* handleFetchRequest:: @cindex\s-+handleFetchRequest
+//* handleIdlePEs:: @cindex\s-+handleIdlePEs
+//* idlers:: @cindex\s-+idlers
+//* insertThread:: @cindex\s-+insertThread
+//* insert_event:: @cindex\s-+insert_event
+//* is_on_queue:: @cindex\s-+is_on_queue
+//* is_unique:: @cindex\s-+is_unique
+//* new_event:: @cindex\s-+new_event
+//* prepend_event:: @cindex\s-+prepend_event
+//* print_event:: @cindex\s-+print_event
+//* print_eventq:: @cindex\s-+print_eventq
+//* prune_eventq :: @cindex\s-+prune_eventq
+//* spark queue:: @cindex\s-+spark queue
+//* sparkStealTime:: @cindex\s-+sparkStealTime
+//* stealSomething:: @cindex\s-+stealSomething
+//* stealSpark:: @cindex\s-+stealSpark
+//* stealSparkMagic:: @cindex\s-+stealSparkMagic
+//* stealThread:: @cindex\s-+stealThread
+//* stealThreadMagic:: @cindex\s-+stealThreadMagic
+//* thread_queue_len:: @cindex\s-+thread_queue_len
+//* traverse_eventq_for_gc:: @cindex\s-+traverse_eventq_for_gc
+//* where_is:: @cindex\s-+where_is
+//@end index