/* ---------------------------------------------------------------------------
 *
 * (c) The GHC Team, 2006
 *
 * Thread-related functionality
 *
 * --------------------------------------------------------------------------*/

#include "PosixSource.h"
#include "Rts.h"

#include "Capability.h"
#include "Updates.h"
#include "Threads.h"
#include "STM.h"
#include "Schedule.h"
#include "Trace.h"
#include "ThreadLabels.h"
#include "Updates.h"
#include "Messages.h"
#include "RaiseAsync.h"
#include "Prelude.h"
#include "Printer.h"
#include "sm/Sanity.h"
#include "sm/Storage.h"

#include <string.h>

/* Next thread ID to allocate.
 * LOCK: sched_mutex
 */
static StgThreadID next_thread_id = 1;

/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the closure to enter)
 *  + 1                       (stg_ap_v_ret)
 *  + 1                       (spare slot req'd by stg_ap_v_ret)
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.
 */
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)

/* ---------------------------------------------------------------------------
   Create a new thread.

   The new thread starts with the given stack size.  Before the
   scheduler can run, however, this thread needs to have a closure
   (and possibly some arguments) pushed on its stack.  See
   pushClosure() in Schedule.h.

   createGenThread() and createIOThread() (in SchedAPI.h) are
   convenient packaged versions of this function.
   ------------------------------------------------------------------------ */
StgTSO *
createThread(Capability *cap, W_ size)
{
    StgTSO *tso;
    StgStack *stack;
    uint32_t stack_size;

    /* sched_mutex is *not* required */

    /* catch ridiculously small stack sizes */
    if (size < MIN_STACK_WORDS + sizeofW(StgStack) + sizeofW(StgTSO)) {
        size = MIN_STACK_WORDS + sizeofW(StgStack) + sizeofW(StgTSO);
    }

    /* The size argument we are given includes all the per-thread
     * overheads:
     *
     *    - The TSO structure
     *    - The STACK header
     *
     * This is so that we can use a nice round power of 2 for the
     * default stack size (e.g. 1k), and if we're allocating lots of
     * threads back-to-back they'll fit nicely in a block.  It's a bit
     * of a benchmark hack, but it doesn't do any harm.
     */
    stack_size = round_to_mblocks(size - sizeofW(StgTSO));
    stack = (StgStack *)allocate(cap, stack_size);
    TICK_ALLOC_STACK(stack_size);
    SET_HDR(stack, &stg_STACK_info, cap->r.rCCCS);
    stack->stack_size   = stack_size - sizeofW(StgStack);
    stack->sp           = stack->stack + stack->stack_size;
    stack->dirty        = 1;

    tso = (StgTSO *)allocate(cap, sizeofW(StgTSO));
    TICK_ALLOC_TSO();
    SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);

    // Always start with the compiled code evaluator
    tso->what_next = ThreadRunGHC;
    tso->why_blocked  = NotBlocked;
    tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
    tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
    tso->flags = 0;
    tso->dirty = 1;
    tso->_link = END_TSO_QUEUE;

    tso->saved_errno = 0;
    tso->bound = NULL;
    tso->cap = cap;

    tso->stackobj       = stack;
    tso->tot_stack_size = stack->stack_size;

    ASSIGN_Int64((W_*)&(tso->alloc_limit), 0);

    tso->trec = NO_TREC;

#if defined(PROFILING)
    tso->prof.cccs = CCS_MAIN;
#endif

    // put a stop frame on the stack
    stack->sp -= sizeofW(StgStopFrame);
    SET_HDR((StgClosure*)stack->sp,
            (StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);

    /* Link the new thread on the global thread list.
     */
    ACQUIRE_LOCK(&sched_mutex);
    tso->id = next_thread_id++;  // while we have the mutex
    tso->global_link = g0->threads;
    g0->threads = tso;
    RELEASE_LOCK(&sched_mutex);

    // ToDo: report the stack size in the event?
    traceEventCreateThread(cap, tso);

    return tso;
}

/* ---------------------------------------------------------------------------
 * Comparing Thread ids.
 *
 * This is used from STG land in the implementation of the
 * instances of Eq/Ord for ThreadIds.
 * ------------------------------------------------------------------------ */

int
cmp_thread(StgPtr tso1, StgPtr tso2)
{
  StgThreadID id1 = ((StgTSO *)tso1)->id;
  StgThreadID id2 = ((StgTSO *)tso2)->id;

  if (id1 < id2) return (-1);
  if (id1 > id2) return 1;
  return 0;
}

/* ---------------------------------------------------------------------------
 * Fetching the ThreadID from an StgTSO.
 *
 * This is used in the implementation of Show for ThreadIds.
 * ------------------------------------------------------------------------ */
int
rts_getThreadId(StgPtr tso)
{
  return ((StgTSO *)tso)->id;
}

/* ---------------------------------------------------------------------------
 * Enabling and disabling the thread allocation limit
 * ------------------------------------------------------------------------ */

void rts_enableThreadAllocationLimit(StgPtr tso)
{
    ((StgTSO *)tso)->flags |= TSO_ALLOC_LIMIT;
}

void rts_disableThreadAllocationLimit(StgPtr tso)
{
    ((StgTSO *)tso)->flags &= ~TSO_ALLOC_LIMIT;
}

/* -----------------------------------------------------------------------------
   Remove a thread from a queue.
   Fails fatally if the TSO is not on the queue.
   -------------------------------------------------------------------------- */

bool // returns true if we modified queue
removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
{
    StgTSO *t, *prev;

    prev = NULL;
    for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
        if (t == tso) {
            if (prev) {
                setTSOLink(cap,prev,t->_link);
                t->_link = END_TSO_QUEUE;
                return false;
            } else {
                *queue = t->_link;
                t->_link = END_TSO_QUEUE;
                return true;
            }
        }
    }
    barf("removeThreadFromQueue: not found");
}

bool // returns true if we modified head or tail
removeThreadFromDeQueue (Capability *cap,
                         StgTSO **head, StgTSO **tail, StgTSO *tso)
{
    StgTSO *t, *prev;
    bool flag = false;

    prev = NULL;
    for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
        if (t == tso) {
            if (prev) {
                setTSOLink(cap,prev,t->_link);
                flag = false;
            } else {
                *head = t->_link;
                flag = true;
            }
            t->_link = END_TSO_QUEUE;
            if (*tail == tso) {
                if (prev) {
                    *tail = prev;
                } else {
                    *tail = END_TSO_QUEUE;
                }
                return true;
            } else {
                return flag;
            }
        }
    }
    barf("removeThreadFromDeQueue: not found");
}

/* ----------------------------------------------------------------------------
   tryWakeupThread()

   Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
   always safe to call it too many times, but it is not safe in
   general to omit a call.

   ------------------------------------------------------------------------- */

void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
    traceEventThreadWakeup (cap, tso, tso->cap->no);

#if defined(THREADED_RTS)
    if (tso->cap != cap)
    {
        MessageWakeup *msg;
        msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
        SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
        msg->tso = tso;
        sendMessage(cap, tso->cap, (Message*)msg);
        debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
                      (W_)tso->id, tso->cap->no);
        return;
    }
#endif

    switch (tso->why_blocked)
    {
    case BlockedOnMVar:
    case BlockedOnMVarRead:
    {
        if (tso->_link == END_TSO_QUEUE) {
            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
            goto unblock;
        } else {
            return;
        }
    }

    case BlockedOnMsgThrowTo:
    {
        const StgInfoTable *i;

        i = lockClosure(tso->block_info.closure);
        unlockClosure(tso->block_info.closure, i);
        if (i != &stg_MSG_NULL_info) {
            debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
                          (W_)tso->id, tso->block_info.throwto->header.info);
            return;
        }

        // remove the block frame from the stack
        ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
        tso->stackobj->sp += 3;
        goto unblock;
    }

    case BlockedOnSTM:
        tso->block_info.closure = &stg_STM_AWOKEN_closure;
        goto unblock;

    case BlockedOnBlackHole:
    case ThreadMigrating:
        goto unblock;

    default:
        // otherwise, do nothing
        return;
    }

unblock:
    // just run the thread now, if the BH is not really available,
    // we'll block again.
    tso->why_blocked = NotBlocked;
    appendToRunQueue(cap,tso);

    // We used to set the context switch flag here, which would
    // trigger a context switch a short time in the future (at the end
    // of the current nursery block).  The idea is that we have just
    // woken up a thread, so we may need to load-balance and migrate
    // threads to other CPUs.  On the other hand, setting the context
    // switch flag here unfairly penalises the current thread by
    // yielding its time slice too early.
    //
    // The synthetic benchmark nofib/smp/chan can be used to show the
    // difference quite clearly.

    // cap->context_switch = 1;
}

/* ----------------------------------------------------------------------------
   migrateThread
   ------------------------------------------------------------------------- */

void
migrateThread (Capability *from, StgTSO *tso, Capability *to)
{
    traceEventMigrateThread (from, tso, to->no);
    // ThreadMigrating tells the target cap that it needs to be added to
    // the run queue when it receives the MSG_TRY_WAKEUP.
    tso->why_blocked = ThreadMigrating;
    tso->cap = to;
    tryWakeupThread(from, tso);
}

/* ----------------------------------------------------------------------------
   awakenBlockedQueue

   wakes up all the threads on the specified queue.
   ------------------------------------------------------------------------- */

static void
wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
{
    MessageBlackHole *msg;
    const StgInfoTable *i;

    ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
           bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );

    for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
         msg = msg->link) {
        i = msg->header.info;
        if (i != &stg_IND_info) {
            ASSERT(i == &stg_MSG_BLACKHOLE_info);
            tryWakeupThread(cap,msg->tso);
        }
    }

    // overwrite the BQ with an indirection so it will be
    // collected at the next GC.
    OVERWRITE_INFO(bq, &stg_IND_info);
}

// If we update a closure that we know we BLACKHOLE'd, and the closure
// no longer points to the current TSO as its owner, then there may be
// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
// it.  We therefore traverse the BLOCKING_QUEUEs attached to the
// current TSO to see if any can now be woken up.
void
checkBlockingQueues (Capability *cap, StgTSO *tso)
{
    StgBlockingQueue *bq, *next;
    StgClosure *p;

    debugTraceCap(DEBUG_sched, cap,
                  "collision occurred; checking blocking queues for thread %ld",
                  (W_)tso->id);

    for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
        next = bq->link;

        if (bq->header.info == &stg_IND_info) {
            // ToDo: could short it out right here, to avoid
            // traversing this IND multiple times.
            continue;
        }

        p = bq->bh;

        if (p->header.info != &stg_BLACKHOLE_info ||
            ((StgInd *)p)->indirectee != (StgClosure*)bq)
        {
            wakeBlockingQueue(cap,bq);
        }
    }
}

/* ----------------------------------------------------------------------------
   updateThunk

   Update a thunk with a value.  In order to do this, we need to know
   which TSO owns (or is evaluating) the thunk, in case we need to
   awaken any threads that are blocked on it.
   ------------------------------------------------------------------------- */

void
updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
{
    StgClosure *v;
    StgTSO *owner;
    const StgInfoTable *i;

    i = thunk->header.info;
    if (i != &stg_BLACKHOLE_info &&
        i != &stg_CAF_BLACKHOLE_info &&
        i != &__stg_EAGER_BLACKHOLE_info &&
        i != &stg_WHITEHOLE_info) {
        updateWithIndirection(cap, thunk, val);
        return;
    }

    v = UNTAG_CLOSURE(((StgInd*)thunk)->indirectee);

    updateWithIndirection(cap, thunk, val);

    // sometimes the TSO is locked when we reach here, so its header
    // might be WHITEHOLE.  Hence check for the correct owner using
    // pointer equality first.
    if ((StgTSO*)v == tso) {
        return;
    }

    i = v->header.info;
    if (i == &stg_TSO_info) {
        checkBlockingQueues(cap, tso);
        return;
    }

    if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
        i != &stg_BLOCKING_QUEUE_DIRTY_info) {
        checkBlockingQueues(cap, tso);
        return;
    }

    owner = ((StgBlockingQueue*)v)->owner;

    if (owner != tso) {
        checkBlockingQueues(cap, tso);
    } else {
        wakeBlockingQueue(cap, (StgBlockingQueue*)v);
    }
}

/* ---------------------------------------------------------------------------
 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
 * used by Control.Concurrent for error checking.
 * ------------------------------------------------------------------------- */

HsBool
rtsSupportsBoundThreads(void)
{
#if defined(THREADED_RTS)
  return HS_BOOL_TRUE;
#else
  return HS_BOOL_FALSE;
#endif
}

/* ---------------------------------------------------------------------------
 * isThreadBound(tso): check whether tso is bound to an OS thread.
 * ------------------------------------------------------------------------- */

StgBool
isThreadBound(StgTSO* tso USED_IF_THREADS)
{
#if defined(THREADED_RTS)
  return (tso->bound != NULL);
#endif
  return false;
}

/* -----------------------------------------------------------------------------
   Stack overflow

   If the thread has reached its maximum stack size, then raise the
   StackOverflow exception in the offending thread.  Otherwise
   relocate the TSO into a larger chunk of memory and adjust its stack
   size appropriately.
   -------------------------------------------------------------------------- */

void
threadStackOverflow (Capability *cap, StgTSO *tso)
{
    StgStack *new_stack, *old_stack;
    StgUnderflowFrame *frame;
    W_ chunk_size;

    IF_DEBUG(sanity,checkTSO(tso));

    if (RtsFlags.GcFlags.maxStkSize > 0
        && tso->tot_stack_size >= RtsFlags.GcFlags.maxStkSize) {
        // #3677: In a stack overflow situation, stack squeezing may
        // reduce the stack size, but we don't know whether it has been
        // reduced enough for the stack check to succeed if we try
        // again.  Fortunately stack squeezing is idempotent, so all we
        // need to do is record whether *any* squeezing happened.  If we
        // are at the stack's absolute -K limit, and stack squeezing
        // happened, then we try running the thread again.  The
        // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
        // squeezing happened or not.
        if (tso->flags & TSO_SQUEEZED) {
            return;
        }

        debugTrace(DEBUG_gc,
                   "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
                   (long)tso->id, tso, (long)tso->stackobj->stack_size,
                   RtsFlags.GcFlags.maxStkSize);
        IF_DEBUG(gc,
                 /* If we're debugging, just print out the top of the stack */
                 printStackChunk(tso->stackobj->sp,
                                 stg_min(tso->stackobj->stack + tso->stackobj->stack_size,
                                         tso->stackobj->sp+64)));

        // Note [Throw to self when masked], also #767 and #8303.
        throwToSelf(cap, tso, (StgClosure *)stackOverflow_closure);
        return;
    }


    // We also want to avoid enlarging the stack if squeezing has
    // already released some of it.  However, we don't want to get into
    // a pathological situation where a thread has a nearly full stack
    // (near its current limit, but not near the absolute -K limit),
    // keeps allocating a little bit, squeezing removes a little bit,
    // and then it runs again.  So to avoid this, if we squeezed *and*
    // there is still less than BLOCK_SIZE_W words free, then we enlarge
    // the stack anyway.
    //
    // NB: This reasoning only applies if the stack has been squeezed;
    // if no squeezing has occurred, then BLOCK_SIZE_W free space does
    // not mean there is enough stack to run; the thread may have
    // requested a large amount of stack (see below).  If the amount
    // we squeezed is not enough to run the thread, we'll come back
    // here (no squeezing will have occurred and thus we'll enlarge the
    // stack.)
    if ((tso->flags & TSO_SQUEEZED) &&
        ((W_)(tso->stackobj->sp - tso->stackobj->stack) >= BLOCK_SIZE_W)) {
        return;
    }

    old_stack = tso->stackobj;

    // If we used less than half of the previous stack chunk, then we
    // must have failed a stack check for a large amount of stack.  In
    // this case we allocate a double-sized chunk to try to
    // accommodate the large stack request.  If that also fails, the
    // next chunk will be 4x normal size, and so on.
    //
    // It would be better to have the mutator tell us how much stack
    // was needed, as we do with heap allocations, but this works for
    // now.
    //
    if (old_stack->sp > old_stack->stack + old_stack->stack_size / 2)
    {
        chunk_size = stg_max(2 * (old_stack->stack_size + sizeofW(StgStack)),
                             RtsFlags.GcFlags.stkChunkSize);
    }
    else
    {
        chunk_size = RtsFlags.GcFlags.stkChunkSize;
    }

    debugTraceCap(DEBUG_sched, cap,
                  "allocating new stack chunk of size %d bytes",
                  chunk_size * sizeof(W_));

    // Charge the current thread for allocating stack.  Stack usage is
    // non-deterministic, because the chunk boundaries might vary from
    // run to run, but accounting for this is better than not
    // accounting for it, since a deep recursion will otherwise not be
    // subject to allocation limits.
    cap->r.rCurrentTSO = tso;
    new_stack = (StgStack*) allocate(cap, chunk_size);
    cap->r.rCurrentTSO = NULL;

    SET_HDR(new_stack, &stg_STACK_info, old_stack->header.prof.ccs);
    TICK_ALLOC_STACK(chunk_size);

    new_stack->dirty = 0; // begin clean, we'll mark it dirty below
    new_stack->stack_size = chunk_size - sizeofW(StgStack);
    new_stack->sp = new_stack->stack + new_stack->stack_size;

    tso->tot_stack_size += new_stack->stack_size;

    {
        StgWord *sp;
        W_ chunk_words, size;

        // find the boundary of the chunk of old stack we're going to
        // copy to the new stack.  We skip over stack frames until we
        // reach the smaller of
        //
        //   * the chunk buffer size (+RTS -kb)
        //   * the end of the old stack
        //
        for (sp = old_stack->sp;
             sp < stg_min(old_stack->sp + RtsFlags.GcFlags.stkChunkBufferSize,
                          old_stack->stack + old_stack->stack_size); )
        {
            size = stack_frame_sizeW((StgClosure*)sp);

            // if including this frame would exceed the size of the
            // new stack (taking into account the underflow frame),
            // then stop at the previous frame.
            if (sp + size > old_stack->sp + (new_stack->stack_size -
                                             sizeofW(StgUnderflowFrame))) {
                break;
            }
            sp += size;
        }

        if (sp == old_stack->stack + old_stack->stack_size) {
            //
            // the old stack chunk is now empty, so we do *not* insert
            // an underflow frame pointing back to it.  There are two
            // cases: either the old stack chunk was the last one, in
            // which case it ends with a STOP_FRAME, or it is not the
            // last one, and it already ends with an UNDERFLOW_FRAME
            // pointing to the previous chunk.  In the latter case, we
            // will copy the UNDERFLOW_FRAME into the new stack chunk.
            // In both cases, the old chunk will be subsequently GC'd.
            //
            // With the default settings, -ki1k -kb1k, this means the
            // first stack chunk will be discarded after the first
            // overflow, being replaced by a non-moving 32k chunk.
            //
        } else {
            new_stack->sp -= sizeofW(StgUnderflowFrame);
            frame = (StgUnderflowFrame*)new_stack->sp;
            frame->info = &stg_stack_underflow_frame_info;
            frame->next_chunk  = old_stack;
        }

        // copy the stack chunk between tso->sp and sp to
        //   new_tso->sp + (tso->sp - sp)
        chunk_words = sp - old_stack->sp;

        memcpy(/* dest   */ new_stack->sp - chunk_words,
               /* source */ old_stack->sp,
               /* size   */ chunk_words * sizeof(W_));

        old_stack->sp += chunk_words;
        new_stack->sp -= chunk_words;
    }

    tso->stackobj = new_stack;

    // we're about to run it, better mark it dirty
    dirty_STACK(cap, new_stack);

    IF_DEBUG(sanity,checkTSO(tso));
    // IF_DEBUG(scheduler,printTSO(new_tso));
}



/* ---------------------------------------------------------------------------
   Stack underflow - called from the stg_stack_underflow_info frame
   ------------------------------------------------------------------------ */

W_ // returns offset to the return address
threadStackUnderflow (Capability *cap, StgTSO *tso)
{
    StgStack *new_stack, *old_stack;
    StgUnderflowFrame *frame;
    uint32_t retvals;

    debugTraceCap(DEBUG_sched, cap, "stack underflow");

    old_stack = tso->stackobj;

    frame = (StgUnderflowFrame*)(old_stack->stack + old_stack->stack_size
                                 - sizeofW(StgUnderflowFrame));
    ASSERT(frame->info == &stg_stack_underflow_frame_info);

    new_stack = (StgStack*)frame->next_chunk;
    tso->stackobj = new_stack;

    retvals = (P_)frame - old_stack->sp;
    if (retvals != 0)
    {
        // we have some return values to copy to the old stack
        if ((W_)(new_stack->sp - new_stack->stack) < retvals)
        {
            barf("threadStackUnderflow: not enough space for return values");
        }

        new_stack->sp -= retvals;

        memcpy(/* dest */ new_stack->sp,
               /* src  */ old_stack->sp,
               /* size */ retvals * sizeof(W_));
    }

    // empty the old stack.  The GC may still visit this object
    // because it is on the mutable list.
    old_stack->sp = old_stack->stack + old_stack->stack_size;

    // restore the stack parameters, and update tot_stack_size
    tso->tot_stack_size -= old_stack->stack_size;

    // we're about to run it, better mark it dirty
    dirty_STACK(cap, new_stack);

    return retvals;
}

/* ----------------------------------------------------------------------------
   Implementation of tryPutMVar#

   NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
   ------------------------------------------------------------------------- */

bool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
{
    const StgInfoTable *info;
    StgMVarTSOQueue *q;
    StgTSO *tso;

    info = lockClosure((StgClosure*)mvar);

    if (mvar->value != &stg_END_TSO_QUEUE_closure) {
#if defined(THREADED_RTS)
        unlockClosure((StgClosure*)mvar, info);
#endif
        return false;
    }

    q = mvar->head;
loop:
    if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
        /* No further takes, the MVar is now full. */
        if (info == &stg_MVAR_CLEAN_info) {
            dirty_MVAR(&cap->r, (StgClosure*)mvar);
        }

        mvar->value = value;
        unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
        return true;
    }
    if (q->header.info == &stg_IND_info ||
        q->header.info == &stg_MSG_NULL_info) {
        q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
        goto loop;
    }

    // There are takeMVar(s) waiting: wake up the first one
    tso = q->tso;
    mvar->head = q->link;
    if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
        mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
    }

    ASSERT(tso->block_info.closure == (StgClosure*)mvar);
    // save why_blocked here, because waking up the thread destroys
    // this information
    StgWord why_blocked = tso->why_blocked;

    // actually perform the takeMVar
    StgStack* stack = tso->stackobj;
    stack->sp[1] = (W_)value;
    stack->sp[0] = (W_)&stg_ret_p_info;

    // indicate that the MVar operation has now completed.
    tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;

    if (stack->dirty == 0) {
        dirty_STACK(cap, stack);
    }

    tryWakeupThread(cap, tso);

    // If it was a readMVar, then we can still do work,
    // so loop back. (XXX: This could take a while)
    if (why_blocked == BlockedOnMVarRead) {
        q = ((StgMVarTSOQueue*)q)->link;
        goto loop;
    }

    ASSERT(why_blocked == BlockedOnMVar);

    unlockClosure((StgClosure*)mvar, info);

    return true;
}

/* ----------------------------------------------------------------------------
 * Debugging: why is a thread blocked
 * ------------------------------------------------------------------------- */

#if defined(DEBUG)
void
printThreadBlockage(StgTSO *tso)
{
  switch (tso->why_blocked) {
#if defined(mingw32_HOST_OS)
    case BlockedOnDoProc:
    debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
    break;
#endif
#if !defined(THREADED_RTS)
  case BlockedOnRead:
    debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
    break;
  case BlockedOnWrite:
    debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
    break;
  case BlockedOnDelay:
    debugBelch("is blocked until %ld", (long)(tso->block_info.target));
    break;
#endif
  case BlockedOnMVar:
    debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
    break;
  case BlockedOnMVarRead:
    debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
    break;
  case BlockedOnBlackHole:
      debugBelch("is blocked on a black hole %p",
                 ((StgBlockingQueue*)tso->block_info.bh->bh));
    break;
  case BlockedOnMsgThrowTo:
    debugBelch("is blocked on a throwto message");
    break;
  case NotBlocked:
    debugBelch("is not blocked");
    break;
  case ThreadMigrating:
    debugBelch("is runnable, but not on the run queue");
    break;
  case BlockedOnCCall:
    debugBelch("is blocked on an external call");
    break;
  case BlockedOnCCall_Interruptible:
    debugBelch("is blocked on an external call (but may be interrupted)");
    break;
  case BlockedOnSTM:
    debugBelch("is blocked on an STM operation");
    break;
  default:
    barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%p)",
         tso->why_blocked, tso->id, tso);
  }
}


void
printThreadStatus(StgTSO *t)
{
  debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
    {
      void *label = lookupThreadLabel(t->id);
      if (label) debugBelch("[\"%s\"] ",(char *)label);
    }
        switch (t->what_next) {
        case ThreadKilled:
            debugBelch("has been killed");
            break;
        case ThreadComplete:
            debugBelch("has completed");
            break;
        default:
            printThreadBlockage(t);
        }
        if (t->dirty) {
            debugBelch(" (TSO_DIRTY)");
        }
        debugBelch("\n");
}

void
printAllThreads(void)
{
  StgTSO *t, *next;
  uint32_t i, g;
  Capability *cap;

  debugBelch("all threads:\n");

  for (i = 0; i < n_capabilities; i++) {
      cap = capabilities[i];
      debugBelch("threads on capability %d:\n", cap->no);
      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
          printThreadStatus(t);
      }
  }

  debugBelch("other threads:\n");
  for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
    for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
      if (t->why_blocked != NotBlocked) {
          printThreadStatus(t);
      }
      next = t->global_link;
    }
  }
}

// useful from gdb
void
printThreadQueue(StgTSO *t)
{
    uint32_t i = 0;
    for (; t != END_TSO_QUEUE; t = t->_link) {
        printThreadStatus(t);
        i++;
    }
    debugBelch("%d threads on queue\n", i);
}

#endif /* DEBUG */