diff options
author | Edward Z. Yang <ezyang@mit.edu> | 2010-09-19 00:29:05 +0000 |
---|---|---|
committer | Edward Z. Yang <ezyang@mit.edu> | 2010-09-19 00:29:05 +0000 |
commit | 83d563cb9ede0ba792836e529b1e2929db926355 (patch) | |
tree | 1f9de77ebd24ca7a67894c51442b657d2f265630 /rts | |
parent | 9fa96fc44a640014415e1588f50ab7689285e6cb (diff) | |
download | haskell-83d563cb9ede0ba792836e529b1e2929db926355.tar.gz |
Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
This is patch that adds support for interruptible FFI calls in the form
of a new foreign import keyword 'interruptible', which can be used
instead of 'safe' or 'unsafe'. Interruptible FFI calls act like safe
FFI calls, except that the worker thread they run on may be interrupted.
Internally, it replaces BlockedOnCCall_NoUnblockEx with
BlockedOnCCall_Interruptible, and changes the behavior of the RTS
to not modify the TSO_ flags on the event of an FFI call from
a thread that was interruptible. It also modifies the bytecode
format for foreign call, adding an extra Word16 to indicate
interruptibility.
The semantics of interruption vary from platform to platform, but the
intent is that any blocking system calls are aborted with an error code.
This is most useful for making function calls to system library
functions that support interrupting. There is no support for pre-Vista
Windows.
There is a partner testsuite patch which adds several tests for this
functionality.
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Interpreter.c | 3 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 25 | ||||
-rw-r--r-- | rts/Schedule.c | 23 | ||||
-rw-r--r-- | rts/Task.c | 9 | ||||
-rw-r--r-- | rts/Task.h | 5 | ||||
-rw-r--r-- | rts/Threads.c | 4 | ||||
-rw-r--r-- | rts/posix/OSThreads.c | 10 | ||||
-rw-r--r-- | rts/sm/MarkWeak.c | 4 | ||||
-rw-r--r-- | rts/win32/OSThreads.c | 19 |
9 files changed, 85 insertions, 17 deletions
diff --git a/rts/Interpreter.c b/rts/Interpreter.c index 9a38a7ed18..da7ee2196a 100644 --- a/rts/Interpreter.c +++ b/rts/Interpreter.c @@ -1356,6 +1356,7 @@ run_BCO: void *tok; int stk_offset = BCO_NEXT; int o_itbl = BCO_NEXT; + int interruptible = BCO_NEXT; void(*marshall_fn)(void*) = (void (*)(void*))BCO_LIT(o_itbl); int ret_dyn_size = RET_DYN_BITMAP_SIZE + RET_DYN_NONPTR_REGS_SIZE @@ -1444,7 +1445,7 @@ run_BCO: ((StgRetDyn *)Sp)->payload[0] = (StgClosure *)obj; SAVE_STACK_POINTERS; - tok = suspendThread(&cap->r); + tok = suspendThread(&cap->r, interruptible ? rtsTrue : rtsFalse); // We already made a copy of the arguments above. ffi_call(cif, fn, ret, argptrs); diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index ad830cf322..b94ccea283 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -127,7 +127,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) Capability, and it is - NotBlocked, BlockedOnMsgThrowTo, - BlockedOnCCall + BlockedOnCCall_Interruptible - or it is masking exceptions (TSO_BLOCKEX) @@ -392,8 +392,29 @@ check_target: return THROWTO_SUCCESS; } + case BlockedOnCCall_Interruptible: +#ifdef THREADED_RTS + { + Task *task = NULL; + // walk suspended_ccalls to find the correct worker thread + InCall *incall; + for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) { + if (incall->suspended_tso == target) { + task = incall->task; + break; + } + } + if (task != NULL) { + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + interruptWorkerTask(task); + return THROWTO_SUCCESS; + } else { + debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill"); + } + // fall to next + } +#endif case BlockedOnCCall: - case BlockedOnCCall_NoUnblockExc: blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; diff --git a/rts/Schedule.c b/rts/Schedule.c index 8db125da74..0850749b36 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -1716,13 +1716,17 @@ recoverSuspendedTask (Capability *cap, Task *task) * the whole system. * * The Haskell thread making the C call is put to sleep for the - * duration of the call, on the susepended_ccalling_threads queue. We + * duration of the call, on the suspended_ccalling_threads queue. We * give out a token to the task, which it can use to resume the thread * on return from the C function. + * + * If this is an interruptible C call, this means that the FFI call may be + * unceremoniously terminated and should be scheduled on an + * unbound worker thread. * ------------------------------------------------------------------------- */ void * -suspendThread (StgRegTable *reg) +suspendThread (StgRegTable *reg, rtsBool interruptible) { Capability *cap; int saved_errno; @@ -1751,12 +1755,10 @@ suspendThread (StgRegTable *reg) threadPaused(cap,tso); - if ((tso->flags & TSO_BLOCKEX) == 0) { - tso->why_blocked = BlockedOnCCall; - tso->flags |= TSO_BLOCKEX; - tso->flags &= ~TSO_INTERRUPTIBLE; + if (interruptible) { + tso->why_blocked = BlockedOnCCall_Interruptible; } else { - tso->why_blocked = BlockedOnCCall_NoUnblockExc; + tso->why_blocked = BlockedOnCCall; } // Hand back capability @@ -1815,12 +1817,11 @@ resumeThread (void *task_) traceEventRunThread(cap, tso); - if (tso->why_blocked == BlockedOnCCall) { + if ((tso->flags & TSO_BLOCKEX) == 0) { // avoid locking the TSO if we don't have to if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); } - tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); } /* Reset blocking status */ @@ -2331,7 +2332,7 @@ deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) // we must own all Capabilities. if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { + tso->why_blocked != BlockedOnCCall_Interruptible) { throwToSingleThreaded(tso->cap,tso,NULL); } } @@ -2343,7 +2344,7 @@ deleteThread_(Capability *cap, StgTSO *tso) // like deleteThread(), but we delete threads in foreign calls, too. if (tso->why_blocked == BlockedOnCCall || - tso->why_blocked == BlockedOnCCall_NoUnblockExc) { + tso->why_blocked == BlockedOnCCall_Interruptible) { tso->what_next = ThreadKilled; appendToRunQueue(tso->cap, tso); } else { diff --git a/rts/Task.c b/rts/Task.c index e93d60d86f..f26785a1be 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -409,6 +409,15 @@ startWorkerTask (Capability *cap) RELEASE_LOCK(&task->lock); } +void +interruptWorkerTask (Task *task) +{ + ASSERT(osThreadId() != task->id); // seppuku not allowed + ASSERT(task->incall->suspended_tso); // use this only for FFI calls + interruptOSThread(task->id); + debugTrace(DEBUG_sched, "interrupted worker task %lu", task->id); +} + #endif /* THREADED_RTS */ #ifdef DEBUG diff --git a/rts/Task.h b/rts/Task.h index 566c0425cd..38e4763b5a 100644 --- a/rts/Task.h +++ b/rts/Task.h @@ -225,6 +225,11 @@ INLINE_HEADER Task *myTask (void); // void startWorkerTask (Capability *cap); +// Interrupts a worker task that is performing an FFI call. The thread +// should not be destroyed. +// +void interruptWorkerTask (Task *task); + #endif /* THREADED_RTS */ // ----------------------------------------------------------------------------- diff --git a/rts/Threads.c b/rts/Threads.c index 6635ed51e5..7344134a7d 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -492,8 +492,8 @@ printThreadBlockage(StgTSO *tso) case BlockedOnCCall: debugBelch("is blocked on an external call"); break; - case BlockedOnCCall_NoUnblockExc: - debugBelch("is blocked on an external call (exceptions were already blocked)"); + case BlockedOnCCall_Interruptible: + debugBelch("is blocked on an external call (but may be interrupted)"); break; case BlockedOnSTM: debugBelch("is blocked on an STM operation"); diff --git a/rts/posix/OSThreads.c b/rts/posix/OSThreads.c index 343536e063..283155345c 100644 --- a/rts/posix/OSThreads.c +++ b/rts/posix/OSThreads.c @@ -57,6 +57,10 @@ #include <mach/mach.h> #endif +#ifdef HAVE_SIGNAL_H +# include <signal.h> +#endif + /* * This (allegedly) OS threads independent layer was initially * abstracted away from code that used Pthreads, so the functions @@ -290,6 +294,12 @@ setThreadAffinity (nat n GNUC3_ATTRIBUTE(__unused__), } #endif +void +interruptOSThread (OSThreadId id) +{ + pthread_kill(id, SIGPIPE); +} + #else /* !defined(THREADED_RTS) */ int diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c index e65c176c0a..d4d708e72c 100644 --- a/rts/sm/MarkWeak.c +++ b/rts/sm/MarkWeak.c @@ -270,8 +270,10 @@ static rtsBool tidyThreadList (generation *gen) // if the thread is not masking exceptions but there are // pending exceptions on its queue, then something has gone - // wrong: + // wrong. However, pending exceptions are OK if there is an + // uninterruptible FFI call. ASSERT(t->blocked_exceptions == END_BLOCKED_EXCEPTIONS_QUEUE + || t->why_blocked == BlockedOnCCall || (t->flags & TSO_BLOCKEX)); if (tmp == NULL) { diff --git a/rts/win32/OSThreads.c b/rts/win32/OSThreads.c index cb00bd602d..44db42fef4 100644 --- a/rts/win32/OSThreads.c +++ b/rts/win32/OSThreads.c @@ -269,6 +269,25 @@ setThreadAffinity (nat n, nat m) // cap N of M } } +typedef BOOL (WINAPI *PCSIO)(HANDLE); + +void +interruptOSThread (OSThreadId id) +{ + HANDLE hdl; + PCSIO pCSIO; + if (!(hdl = OpenThread(THREAD_TERMINATE,FALSE,id))) { + sysErrorBelch("interruptOSThread: OpenThread"); + stg_exit(EXIT_FAILURE); + } + pCSIO = (PCSIO) GetProcAddress(GetModuleHandle(TEXT("Kernel32.dll")), "CancelSynchronousIo"); + if ( NULL != pCSIO ) { + pCSIO(hdl); + } else { + // Nothing to do, unfortunately + } +} + #else /* !defined(THREADED_RTS) */ int |