summaryrefslogtreecommitdiff
path: root/rts/Threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'rts/Threads.c')
-rw-r--r--rts/Threads.c974
1 files changed, 974 insertions, 0 deletions
diff --git a/rts/Threads.c b/rts/Threads.c
new file mode 100644
index 0000000000..b550cc6aaa
--- /dev/null
+++ b/rts/Threads.c
@@ -0,0 +1,974 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2006
+ *
+ * Thread-related functionality
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "PosixSource.h"
+#include "Rts.h"
+#include "SchedAPI.h"
+#include "Storage.h"
+#include "Threads.h"
+#include "RtsFlags.h"
+#include "STM.h"
+#include "Schedule.h"
+#include "Trace.h"
+#include "ThreadLabels.h"
+
+/* Next thread ID to allocate.
+ * LOCK: sched_mutex
+ */
+static StgThreadID next_thread_id = 1;
+
+/* The smallest stack size that makes any sense is:
+ * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
+ * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
+ * + 1 (the closure to enter)
+ * + 1 (stg_ap_v_ret)
+ * + 1 (spare slot req'd by stg_ap_v_ret)
+ *
+ * A thread with this stack will bomb immediately with a stack
+ * overflow, which will increase its stack size.
+ */
+#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
+
+/* ---------------------------------------------------------------------------
+ Create a new thread.
+
+ The new thread starts with the given stack size. Before the
+ scheduler can run, however, this thread needs to have a closure
+ (and possibly some arguments) pushed on its stack. See
+ pushClosure() in Schedule.h.
+
+ createGenThread() and createIOThread() (in SchedAPI.h) are
+ convenient packaged versions of this function.
+
+ currently pri (priority) is only used in a GRAN setup -- HWL
+ ------------------------------------------------------------------------ */
+#if defined(GRAN)
+/* currently pri (priority) is only used in a GRAN setup -- HWL */
+StgTSO *
+createThread(nat size, StgInt pri)
+#else
+StgTSO *
+createThread(Capability *cap, nat size)
+#endif
+{
+ StgTSO *tso;
+ nat stack_size;
+
+ /* sched_mutex is *not* required */
+
+ /* First check whether we should create a thread at all */
+#if defined(PARALLEL_HASKELL)
+ /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+ threadsIgnored++;
+ debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ threadsCreated++;
+#endif
+
+#if defined(GRAN)
+ ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+#endif
+
+ // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
+
+ /* catch ridiculously small stack sizes */
+ if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+ size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+ }
+
+ stack_size = size - TSO_STRUCT_SIZEW;
+
+ tso = (StgTSO *)allocateLocal(cap, size);
+ TICK_ALLOC_TSO(stack_size, 0);
+
+ SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
+#if defined(GRAN)
+ SET_GRAN_HDR(tso, ThisPE);
+#endif
+
+ // Always start with the compiled code evaluator
+ tso->what_next = ThreadRunGHC;
+
+ tso->why_blocked = NotBlocked;
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ tso->flags = TSO_DIRTY;
+
+ tso->saved_errno = 0;
+ tso->bound = NULL;
+ tso->cap = cap;
+
+ tso->stack_size = stack_size;
+ tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
+ - TSO_STRUCT_SIZEW;
+ tso->sp = (P_)&(tso->stack) + stack_size;
+
+ tso->trec = NO_TREC;
+
+#ifdef PROFILING
+ tso->prof.CCCS = CCS_MAIN;
+#endif
+
+ /* put a stop frame on the stack */
+ tso->sp -= sizeofW(StgStopFrame);
+ SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+ tso->link = END_TSO_QUEUE;
+
+ // ToDo: check this
+#if defined(GRAN)
+ /* uses more flexible routine in GranSim */
+ insertThread(tso, CurrentProc);
+#else
+ /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
+ * from its creation
+ */
+#endif
+
+#if defined(GRAN)
+ if (RtsFlags.GranFlags.GranSimStats.Full)
+ DumpGranEvent(GR_START,tso);
+#elif defined(PARALLEL_HASKELL)
+ if (RtsFlags.ParFlags.ParStats.Full)
+ DumpGranEvent(GR_STARTQ,tso);
+ /* HACk to avoid SCHEDULE
+ LastTSO = tso; */
+#endif
+
+ /* Link the new thread on the global thread list.
+ */
+ ACQUIRE_LOCK(&sched_mutex);
+ tso->id = next_thread_id++; // while we have the mutex
+ tso->global_link = all_threads;
+ all_threads = tso;
+ RELEASE_LOCK(&sched_mutex);
+
+#if defined(DIST)
+ tso->dist.priority = MandatoryPriority; //by default that is...
+#endif
+
+#if defined(GRAN)
+ tso->gran.pri = pri;
+# if defined(DEBUG)
+ tso->gran.magic = TSO_MAGIC; // debugging only
+# endif
+ tso->gran.sparkname = 0;
+ tso->gran.startedat = CURRENT_TIME;
+ tso->gran.exported = 0;
+ tso->gran.basicblocks = 0;
+ tso->gran.allocs = 0;
+ tso->gran.exectime = 0;
+ tso->gran.fetchtime = 0;
+ tso->gran.fetchcount = 0;
+ tso->gran.blocktime = 0;
+ tso->gran.blockcount = 0;
+ tso->gran.blockedat = 0;
+ tso->gran.globalsparks = 0;
+ tso->gran.localsparks = 0;
+ if (RtsFlags.GranFlags.Light)
+ tso->gran.clock = Now; /* local clock */
+ else
+ tso->gran.clock = 0;
+
+ IF_DEBUG(gran,printTSO(tso));
+#elif defined(PARALLEL_HASKELL)
+# if defined(DEBUG)
+ tso->par.magic = TSO_MAGIC; // debugging only
+# endif
+ tso->par.sparkname = 0;
+ tso->par.startedat = CURRENT_TIME;
+ tso->par.exported = 0;
+ tso->par.basicblocks = 0;
+ tso->par.allocs = 0;
+ tso->par.exectime = 0;
+ tso->par.fetchtime = 0;
+ tso->par.fetchcount = 0;
+ tso->par.blocktime = 0;
+ tso->par.blockcount = 0;
+ tso->par.blockedat = 0;
+ tso->par.globalsparks = 0;
+ tso->par.localsparks = 0;
+#endif
+
+#if defined(GRAN)
+ globalGranStats.tot_threads_created++;
+ globalGranStats.threads_created_on_PE[CurrentProc]++;
+ globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
+ globalGranStats.tot_sq_probes++;
+#elif defined(PARALLEL_HASKELL)
+ // collect parallel global statistics (currently done together with GC stats)
+ if (RtsFlags.ParFlags.ParStats.Global &&
+ RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+ //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
+ globalParStats.tot_threads_created++;
+ }
+#endif
+
+#if defined(GRAN)
+ debugTrace(GRAN_DEBUG_pri,
+ "==__ schedule: Created TSO %d (%p);",
+ CurrentProc, tso, tso->id);
+#elif defined(PARALLEL_HASKELL)
+ debugTrace(PAR_DEBUG_verbose,
+ "==__ schedule: Created TSO %d (%p); %d threads active",
+ (long)tso->id, tso, advisory_thread_count);
+#else
+ debugTrace(DEBUG_sched,
+ "created thread %ld, stack size = %lx words",
+ (long)tso->id, (long)tso->stack_size);
+#endif
+ return tso;
+}
+
+#if defined(PAR)
+/* RFP:
+ all parallel thread creation calls should fall through the following routine.
+*/
+StgTSO *
+createThreadFromSpark(rtsSpark spark)
+{ StgTSO *tso;
+ ASSERT(spark != (rtsSpark)NULL);
+// JB: TAKE CARE OF THIS COUNTER! BUGGY
+ if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
+ { threadsIgnored++;
+ barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
+ RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+ return END_TSO_QUEUE;
+ }
+ else
+ { threadsCreated++;
+ tso = createThread(RtsFlags.GcFlags.initialStkSize);
+ if (tso==END_TSO_QUEUE)
+ barf("createSparkThread: Cannot create TSO");
+#if defined(DIST)
+ tso->priority = AdvisoryPriority;
+#endif
+ pushClosure(tso,spark);
+ addToRunQueue(tso);
+ advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
+ }
+ return tso;
+}
+#endif
+
+/* ---------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * ------------------------------------------------------------------------ */
+
+int
+cmp_thread(StgPtr tso1, StgPtr tso2)
+{
+ StgThreadID id1 = ((StgTSO *)tso1)->id;
+ StgThreadID id2 = ((StgTSO *)tso2)->id;
+
+ if (id1 < id2) return (-1);
+ if (id1 > id2) return 1;
+ return 0;
+}
+
+/* ---------------------------------------------------------------------------
+ * Fetching the ThreadID from an StgTSO.
+ *
+ * This is used in the implementation of Show for ThreadIds.
+ * ------------------------------------------------------------------------ */
+int
+rts_getThreadId(StgPtr tso)
+{
+ return ((StgTSO *)tso)->id;
+}
+
+/* -----------------------------------------------------------------------------
+ Remove a thread from a queue.
+ Fails fatally if the TSO is not on the queue.
+ -------------------------------------------------------------------------- */
+
+void
+removeThreadFromQueue (StgTSO **queue, StgTSO *tso)
+{
+ StgTSO *t, *prev;
+
+ prev = NULL;
+ for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->link) {
+ if (t == tso) {
+ if (prev) {
+ prev->link = t->link;
+ } else {
+ *queue = t->link;
+ }
+ return;
+ }
+ }
+ barf("removeThreadFromQueue: not found");
+}
+
+void
+removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso)
+{
+ StgTSO *t, *prev;
+
+ prev = NULL;
+ for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->link) {
+ if (t == tso) {
+ if (prev) {
+ prev->link = t->link;
+ } else {
+ *head = t->link;
+ }
+ if (*tail == tso) {
+ if (prev) {
+ *tail = prev;
+ } else {
+ *tail = END_TSO_QUEUE;
+ }
+ }
+ return;
+ }
+ }
+ barf("removeThreadFromMVarQueue: not found");
+}
+
+void
+removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso)
+{
+ removeThreadFromDeQueue (&mvar->head, &mvar->tail, tso);
+}
+
+/* ----------------------------------------------------------------------------
+ unblockOne()
+
+ unblock a single thread.
+ ------------------------------------------------------------------------- */
+
+#if defined(GRAN)
+STATIC_INLINE void
+unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+{
+}
+#elif defined(PARALLEL_HASKELL)
+STATIC_INLINE void
+unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+{
+ /* write RESUME events to log file and
+ update blocked and fetch time (depending on type of the orig closure) */
+ if (RtsFlags.ParFlags.ParStats.Full) {
+ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
+ GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
+ 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+ if (emptyRunQueue())
+ emitSchedule = rtsTrue;
+
+ switch (get_itbl(node)->type) {
+ case FETCH_ME_BQ:
+ ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
+ break;
+ case RBH:
+ case FETCH_ME:
+ case BLACKHOLE_BQ:
+ ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
+ break;
+#ifdef DIST
+ case MVAR:
+ break;
+#endif
+ default:
+ barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
+ }
+ }
+}
+#endif
+
+#if defined(GRAN)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
+{
+ StgTSO *tso;
+ PEs node_loc, tso_loc;
+
+ node_loc = where_is(node); // should be lifted out of loop
+ tso = (StgTSO *)bqe; // wastes an assignment to get the type right
+ tso_loc = where_is((StgClosure *)tso);
+ if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
+ /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
+ ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
+ // insertThread(tso, node_loc);
+ new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
+ ResumeThread,
+ tso, node, (rtsSpark*)NULL);
+ tso->link = END_TSO_QUEUE; // overwrite link just to be sure
+ // len_local++;
+ // len++;
+ } else { // TSO is remote (actually should be FMBQ)
+ CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
+ RtsFlags.GranFlags.Costs.gunblocktime +
+ RtsFlags.GranFlags.Costs.latency;
+ new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
+ UnblockThread,
+ tso, node, (rtsSpark*)NULL);
+ tso->link = END_TSO_QUEUE; // overwrite link just to be sure
+ // len++;
+ }
+ /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
+ IF_GRAN_DEBUG(bq,
+ debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
+ (node_loc==tso_loc ? "Local" : "Global"),
+ tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
+ tso->block_info.closure = NULL;
+ debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)",
+ tso->id, tso));
+}
+#elif defined(PARALLEL_HASKELL)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
+{
+ StgBlockingQueueElement *next;
+
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
+ /* if it's a TSO just push it onto the run_queue */
+ next = bqe->link;
+ ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
+ APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
+ threadRunnable();
+ unblockCount(bqe, node);
+ /* reset blocking status after dumping event */
+ ((StgTSO *)bqe)->why_blocked = NotBlocked;
+ break;
+
+ case BLOCKED_FETCH:
+ /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
+ next = bqe->link;
+ bqe->link = (StgBlockingQueueElement *)PendingFetches;
+ PendingFetches = (StgBlockedFetch *)bqe;
+ break;
+
+# if defined(DEBUG)
+ /* can ignore this case in a non-debugging setup;
+ see comments on RBHSave closures above */
+ case CONSTR:
+ /* check that the closure is an RBHSave closure */
+ ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
+ get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
+ get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
+ break;
+
+ default:
+ barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
+ get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
+ (StgClosure *)bqe);
+# endif
+ }
+ IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
+ return next;
+}
+#endif
+
+StgTSO *
+unblockOne (Capability *cap, StgTSO *tso)
+{
+ return unblockOne_(cap,tso,rtsTrue); // allow migration
+}
+
+StgTSO *
+unblockOne_ (Capability *cap, StgTSO *tso,
+ rtsBool allow_migrate USED_IF_THREADS)
+{
+ StgTSO *next;
+
+ ASSERT(get_itbl(tso)->type == TSO);
+ ASSERT(tso->why_blocked != NotBlocked);
+
+ tso->why_blocked = NotBlocked;
+ next = tso->link;
+ tso->link = END_TSO_QUEUE;
+
+#if defined(THREADED_RTS)
+ if (tso->cap == cap || (!tsoLocked(tso) &&
+ allow_migrate &&
+ RtsFlags.ParFlags.wakeupMigrate)) {
+ // We are waking up this thread on the current Capability, which
+ // might involve migrating it from the Capability it was last on.
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
+ appendToRunQueue(cap,tso);
+ // we're holding a newly woken thread, make sure we context switch
+ // quickly so we can migrate it if necessary.
+ context_switch = 1;
+ } else {
+ // we'll try to wake it up on the Capability it was last on.
+ wakeupThreadOnCapability_lock(tso->cap, tso);
+ }
+#else
+ appendToRunQueue(cap,tso);
+ context_switch = 1;
+#endif
+
+ debugTrace(DEBUG_sched,
+ "waking up thread %ld on cap %d",
+ (long)tso->id, tso->cap->no);
+
+ return next;
+}
+
+/* ----------------------------------------------------------------------------
+ awakenBlockedQueue
+
+ wakes up all the threads on the specified queue.
+ ------------------------------------------------------------------------- */
+
+#if defined(GRAN)
+void
+awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ PEs node_loc;
+ nat len = 0;
+
+ IF_GRAN_DEBUG(bq,
+ debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
+ node, CurrentProc, CurrentTime[CurrentProc],
+ CurrentTSO->id, CurrentTSO));
+
+ node_loc = where_is(node);
+
+ ASSERT(q == END_BQ_QUEUE ||
+ get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
+ get_itbl(q)->type == CONSTR); // closure (type constructor)
+ ASSERT(is_unique(node));
+
+ /* FAKE FETCH: magically copy the node to the tso's proc;
+ no Fetch necessary because in reality the node should not have been
+ moved to the other PE in the first place
+ */
+ if (CurrentProc!=node_loc) {
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
+ node, node_loc, CurrentProc, CurrentTSO->id,
+ // CurrentTSO, where_is(CurrentTSO),
+ node->header.gran.procs));
+ node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## new bitmask of node %p is %#x\n",
+ node, node->header.gran.procs));
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ globalGranStats.tot_fake_fetches++;
+ }
+ }
+
+ bqe = q;
+ // ToDo: check: ASSERT(CurrentProc==node_loc);
+ while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
+ //next = bqe->link;
+ /*
+ bqe points to the current element in the queue
+ next points to the next element in the queue
+ */
+ //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
+ //tso_loc = where_is(tso);
+ len++;
+ bqe = unblockOne(bqe, node);
+ }
+
+ /* if this is the BQ of an RBH, we have to put back the info ripped out of
+ the closure to make room for the anchor of the BQ */
+ if (bqe!=END_BQ_QUEUE) {
+ ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
+ /*
+ ASSERT((info_ptr==&RBH_Save_0_info) ||
+ (info_ptr==&RBH_Save_1_info) ||
+ (info_ptr==&RBH_Save_2_info));
+ */
+ /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
+ ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
+ ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
+
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
+ node, info_type(node)));
+ }
+
+ /* statistics gathering */
+ if (RtsFlags.GranFlags.GranSimStats.Global) {
+ // globalGranStats.tot_bq_processing_time += bq_processing_time;
+ globalGranStats.tot_bq_len += len; // total length of all bqs awakened
+ // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
+ globalGranStats.tot_awbq++; // total no. of bqs awakened
+ }
+ IF_GRAN_DEBUG(bq,
+ debugBelch("## BQ Stats of %p: [%d entries] %s\n",
+ node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
+}
+#elif defined(PARALLEL_HASKELL)
+void
+awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+
+ IF_PAR_DEBUG(verbose,
+ debugBelch("##-_ AwBQ for node %p on [%x]: \n",
+ node, mytid));
+#ifdef DIST
+ //RFP
+ if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
+ IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
+ return;
+ }
+#endif
+
+ ASSERT(q == END_BQ_QUEUE ||
+ get_itbl(q)->type == TSO ||
+ get_itbl(q)->type == BLOCKED_FETCH ||
+ get_itbl(q)->type == CONSTR);
+
+ bqe = q;
+ while (get_itbl(bqe)->type==TSO ||
+ get_itbl(bqe)->type==BLOCKED_FETCH) {
+ bqe = unblockOne(bqe, node);
+ }
+}
+
+#else /* !GRAN && !PARALLEL_HASKELL */
+
+void
+awakenBlockedQueue(Capability *cap, StgTSO *tso)
+{
+ while (tso != END_TSO_QUEUE) {
+ tso = unblockOne(cap,tso);
+ }
+}
+#endif
+
+
+/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#if defined(THREADED_RTS)
+ return rtsTrue;
+#else
+ return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+
+StgBool
+isThreadBound(StgTSO* tso USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ return (tso->bound != NULL);
+#endif
+ return rtsFalse;
+}
+
+/* ----------------------------------------------------------------------------
+ * Debugging: why is a thread blocked
+ * ------------------------------------------------------------------------- */
+
+#if DEBUG
+void
+printThreadBlockage(StgTSO *tso)
+{
+ switch (tso->why_blocked) {
+ case BlockedOnRead:
+ debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
+ break;
+ case BlockedOnWrite:
+ debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
+ break;
+#if defined(mingw32_HOST_OS)
+ case BlockedOnDoProc:
+ debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
+ break;
+#endif
+ case BlockedOnDelay:
+ debugBelch("is blocked until %ld", (long)(tso->block_info.target));
+ break;
+ case BlockedOnMVar:
+ debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
+ break;
+ case BlockedOnException:
+ debugBelch("is blocked on delivering an exception to thread %d",
+ tso->block_info.tso->id);
+ break;
+ case BlockedOnBlackHole:
+ debugBelch("is blocked on a black hole");
+ break;
+ case NotBlocked:
+ debugBelch("is not blocked");
+ break;
+#if defined(PARALLEL_HASKELL)
+ case BlockedOnGA:
+ debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
+ tso->block_info.closure, info_type(tso->block_info.closure));
+ break;
+ case BlockedOnGA_NoSend:
+ debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
+ tso->block_info.closure, info_type(tso->block_info.closure));
+ break;
+#endif
+ case BlockedOnCCall:
+ debugBelch("is blocked on an external call");
+ break;
+ case BlockedOnCCall_NoUnblockExc:
+ debugBelch("is blocked on an external call (exceptions were already blocked)");
+ break;
+ case BlockedOnSTM:
+ debugBelch("is blocked on an STM operation");
+ break;
+ default:
+ barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
+ tso->why_blocked, tso->id, tso);
+ }
+}
+
+void
+printThreadStatus(StgTSO *t)
+{
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ switch (t->what_next) {
+ case ThreadKilled:
+ debugBelch("has been killed");
+ break;
+ case ThreadComplete:
+ debugBelch("has completed");
+ break;
+ default:
+ printThreadBlockage(t);
+ }
+ debugBelch("\n");
+ }
+}
+
+void
+printAllThreads(void)
+{
+ StgTSO *t, *next;
+ nat i;
+ Capability *cap;
+
+# if defined(GRAN)
+ char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+ ullong_format_string(TIME_ON_PROC(CurrentProc),
+ time_string, rtsFalse/*no commas!*/);
+
+ debugBelch("all threads at [%s]:\n", time_string);
+# elif defined(PARALLEL_HASKELL)
+ char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+ ullong_format_string(CURRENT_TIME,
+ time_string, rtsFalse/*no commas!*/);
+
+ debugBelch("all threads at [%s]:\n", time_string);
+# else
+ debugBelch("all threads:\n");
+# endif
+
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ debugBelch("threads on capability %d:\n", cap->no);
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ }
+ }
+
+ debugBelch("other threads:\n");
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->why_blocked != NotBlocked) {
+ printThreadStatus(t);
+ }
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ }
+ }
+}
+
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
+
+/*
+ Print a whole blocking queue attached to node (debugging only).
+*/
+# if defined(PARALLEL_HASKELL)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ StgTSO *tso;
+ rtsBool end;
+
+ debugBelch("## BQ of closure %p (%s): ",
+ node, info_type(node));
+
+ /* should cover all closures that may have a blocking queue */
+ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+ get_itbl(node)->type == FETCH_ME_BQ ||
+ get_itbl(node)->type == RBH ||
+ get_itbl(node)->type == MVAR);
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+
+ print_bqe(((StgBlockingQueue*)node)->blocking_queue);
+}
+
+/*
+ Print a whole blocking queue starting with the element bqe.
+*/
+void
+print_bqe (StgBlockingQueueElement *bqe)
+{
+ rtsBool end;
+
+ /*
+ NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+ */
+ for (end = (bqe==END_BQ_QUEUE);
+ !end; // iterate until bqe points to a CONSTR
+ end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
+ bqe = end ? END_BQ_QUEUE : bqe->link) {
+ ASSERT(bqe != END_BQ_QUEUE); // sanity check
+ ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
+ /* types of closures that may appear in a blocking queue */
+ ASSERT(get_itbl(bqe)->type == TSO ||
+ get_itbl(bqe)->type == BLOCKED_FETCH ||
+ get_itbl(bqe)->type == CONSTR);
+ /* only BQs of an RBH end with an RBH_Save closure */
+ //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ debugBelch(" TSO %u (%x),",
+ ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
+ break;
+ case BLOCKED_FETCH:
+ debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
+ ((StgBlockedFetch *)bqe)->node,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
+ ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
+ ((StgBlockedFetch *)bqe)->ga.weight);
+ break;
+ case CONSTR:
+ debugBelch(" %s (IP %p),",
+ (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+ get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+ get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+ "RBH_Save_?"), get_itbl(bqe));
+ break;
+ default:
+ barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
+ info_type((StgClosure *)bqe)); // , node, info_type(node));
+ break;
+ }
+ } /* for */
+ debugBelch("\n");
+}
+# elif defined(GRAN)
+void
+print_bq (StgClosure *node)
+{
+ StgBlockingQueueElement *bqe;
+ PEs node_loc, tso_loc;
+ rtsBool end;
+
+ /* should cover all closures that may have a blocking queue */
+ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+ get_itbl(node)->type == FETCH_ME_BQ ||
+ get_itbl(node)->type == RBH);
+
+ ASSERT(node!=(StgClosure*)NULL); // sanity check
+ node_loc = where_is(node);
+
+ debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
+ node, info_type(node), node_loc);
+
+ /*
+ NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+ */
+ for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
+ !end; // iterate until bqe points to a CONSTR
+ end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
+ ASSERT(bqe != END_BQ_QUEUE); // sanity check
+ ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
+ /* types of closures that may appear in a blocking queue */
+ ASSERT(get_itbl(bqe)->type == TSO ||
+ get_itbl(bqe)->type == CONSTR);
+ /* only BQs of an RBH end with an RBH_Save closure */
+ ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+ tso_loc = where_is((StgClosure *)bqe);
+ switch (get_itbl(bqe)->type) {
+ case TSO:
+ debugBelch(" TSO %d (%p) on [PE %d],",
+ ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
+ break;
+ case CONSTR:
+ debugBelch(" %s (IP %p),",
+ (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+ get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+ get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+ "RBH_Save_?"), get_itbl(bqe));
+ break;
+ default:
+ barf("Unexpected closure type %s in blocking queue of %p (%s)",
+ info_type((StgClosure *)bqe), node, info_type(node));
+ break;
+ }
+ } /* for */
+ debugBelch("\n");
+}
+# endif
+
+#if defined(PARALLEL_HASKELL)
+nat
+run_queue_len(void)
+{
+ nat i;
+ StgTSO *tso;
+
+ for (i=0, tso=run_queue_hd;
+ tso != END_TSO_QUEUE;
+ i++, tso=tso->link) {
+ /* nothing */
+ }
+
+ return i;
+}
+#endif
+
+#endif /* DEBUG */