diff options
Diffstat (limited to 'rts/Messages.c')
-rw-r--r-- | rts/Messages.c | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/rts/Messages.c b/rts/Messages.c new file mode 100644 index 0000000000..2b40f76e93 --- /dev/null +++ b/rts/Messages.c @@ -0,0 +1,296 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2010 + * + * Inter-Capability message passing + * + * --------------------------------------------------------------------------*/ + +#include "Rts.h" +#include "Messages.h" +#include "Trace.h" +#include "Capability.h" +#include "Schedule.h" +#include "Threads.h" +#include "RaiseAsync.h" +#include "sm/Storage.h" + +/* ---------------------------------------------------------------------------- + Send a message to another Capability + ------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg) +{ + ACQUIRE_LOCK(&to_cap->lock); + +#ifdef DEBUG + { + const StgInfoTable *i = msg->header.info; + if (i != &stg_MSG_WAKEUP_info && + i != &stg_MSG_THROWTO_info && + i != &stg_MSG_BLACKHOLE_info && + i != &stg_MSG_TRY_WAKEUP_info && + i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked + i != &stg_WHITEHOLE_info) { + barf("sendMessage: %p", i); + } + } +#endif + + msg->link = to_cap->inbox; + to_cap->inbox = msg; + + recordClosureMutated(from_cap,(StgClosure*)msg); + + if (to_cap->running_task == NULL) { + to_cap->running_task = myTask(); + // precond for releaseCapability_() + releaseCapability_(to_cap,rtsFalse); + } else { + contextSwitchCapability(to_cap); + } + + RELEASE_LOCK(&to_cap->lock); +} + +#endif /* THREADED_RTS */ + +/* ---------------------------------------------------------------------------- + Handle a message + ------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void +executeMessage (Capability *cap, Message *m) +{ + const StgInfoTable *i; + +loop: + write_barrier(); // allow m->header to be modified by another thread + i = m->header.info; + if (i == &stg_MSG_WAKEUP_info) + { + // the plan is to eventually get rid of these and use + // TRY_WAKEUP instead. + MessageWakeup *w = (MessageWakeup *)m; + StgTSO *tso = w->tso; + debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", + (lnat)tso->id); + ASSERT(tso->cap == cap); + ASSERT(tso->why_blocked == BlockedOnMsgWakeup); + ASSERT(tso->block_info.closure == (StgClosure *)m); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); + } + else if (i == &stg_MSG_TRY_WAKEUP_info) + { + StgTSO *tso = ((MessageWakeup *)m)->tso; + debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld", + (lnat)tso->id); + tryWakeupThread(cap, tso); + } + else if (i == &stg_MSG_THROWTO_info) + { + MessageThrowTo *t = (MessageThrowTo *)m; + nat r; + const StgInfoTable *i; + + i = lockClosure((StgClosure*)m); + if (i != &stg_MSG_THROWTO_info) { + unlockClosure((StgClosure*)m, i); + goto loop; + } + + debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", + (lnat)t->source->id, (lnat)t->target->id); + + ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(t->source->block_info.closure == (StgClosure *)m); + + r = throwToMsg(cap, t); + + switch (r) { + case THROWTO_SUCCESS: + ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); + t->source->sp += 3; + unblockOne(cap, t->source); + // this message is done + unlockClosure((StgClosure*)m, &stg_IND_info); + break; + case THROWTO_BLOCKED: + // unlock the message + unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); + break; + } + } + else if (i == &stg_MSG_BLACKHOLE_info) + { + nat r; + MessageBlackHole *b = (MessageBlackHole*)m; + + r = messageBlackHole(cap, b); + if (r == 0) { + tryWakeupThread(cap, b->tso); + } + return; + } + else if (i == &stg_IND_info) + { + // message was revoked + return; + } + else if (i == &stg_WHITEHOLE_info) + { + goto loop; + } + else + { + barf("executeMessage: %p", i); + } +} + +#endif + +/* ---------------------------------------------------------------------------- + Handle a MSG_BLACKHOLE message + + This is called from two places: either we just entered a BLACKHOLE + (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our + cap->inbox. + + We need to establish whether the BLACKHOLE belongs to + this Capability, and + - if so, arrange to block the current thread on it + - otherwise, forward the message to the right place + + Returns: + - 0 if the blocked thread can be woken up by the caller + - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP + at some point in the future. + + ------------------------------------------------------------------------- */ + +nat messageBlackHole(Capability *cap, MessageBlackHole *msg) +{ + const StgInfoTable *info; + StgClosure *p; + StgBlockingQueue *bq; + StgClosure *bh = msg->bh; + StgTSO *owner; + + debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p", + (lnat)msg->tso->id, msg->bh); + + info = bh->header.info; + + // If we got this message in our inbox, it might be that the + // BLACKHOLE has already been updated, and GC has shorted out the + // indirection, so the pointer no longer points to a BLACKHOLE at + // all. + if (info != &stg_BLACKHOLE_info && + info != &stg_CAF_BLACKHOLE_info && + info != &stg_WHITEHOLE_info) { + // if it is a WHITEHOLE, then a thread is in the process of + // trying to BLACKHOLE it. But we know that it was once a + // BLACKHOLE, so there is at least a valid pointer in the + // payload, so we can carry on. + return 0; + } + + // we know at this point that the closure +loop: + p = ((StgInd*)bh)->indirectee; + info = p->header.info; + + if (info == &stg_IND_info) + { + // This could happen, if e.g. we got a BLOCKING_QUEUE that has + // just been replaced with an IND by another thread in + // updateThunk(). In which case, if we read the indirectee + // again we should get the value. + goto loop; + } + + else if (info == &stg_TSO_info) + { + owner = deRefTSO((StgTSO *)p); + +#ifdef THREADED_RTS + if (owner->cap != cap) { + sendMessage(cap, owner->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no); + return 1; + } +#endif + // owner is the owner of the BLACKHOLE, and resides on this + // Capability. msg->tso is the first thread to block on this + // BLACKHOLE, so we first create a BLOCKING_QUEUE object. + + bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue)); + + // initialise the BLOCKING_QUEUE object + SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM); + bq->bh = bh; + bq->queue = msg; + bq->owner = owner; + + msg->link = (MessageBlackHole*)END_TSO_QUEUE; + + // All BLOCKING_QUEUES are linked in a list on owner->bq, so + // that we can search through them in the event that there is + // a collision to update a BLACKHOLE and a BLOCKING_QUEUE + // becomes orphaned (see updateThunk()). + bq->link = owner->bq; + owner->bq = bq; + dirty_TSO(cap, owner); // we modified owner->bq + + // point to the BLOCKING_QUEUE from the BLACKHOLE + write_barrier(); // make the BQ visible + ((StgInd*)bh)->indirectee = (StgClosure *)bq; + recordClosureMutated(cap,bh); // bh was mutated + + debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", + (lnat)msg->tso->id, (lnat)owner->id); + + return 1; // blocked + } + else if (info == &stg_BLOCKING_QUEUE_CLEAN_info || + info == &stg_BLOCKING_QUEUE_DIRTY_info) + { + StgBlockingQueue *bq = (StgBlockingQueue *)p; + + ASSERT(bq->bh == bh); + + owner = deRefTSO(bq->owner); + + ASSERT(owner != END_TSO_QUEUE); + +#ifdef THREADED_RTS + if (owner->cap != cap) { + sendMessage(cap, owner->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no); + return 1; + } +#endif + + msg->link = bq->queue; + bq->queue = msg; + recordClosureMutated(cap,(StgClosure*)msg); + + if (info == &stg_BLOCKING_QUEUE_CLEAN_info) { + bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + recordClosureMutated(cap,bq); + } + + debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", + (lnat)msg->tso->id, (lnat)owner->id); + + return 1; // blocked + } + + return 0; // not blocked +} + |