summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorEdward Z. Yang <ezyang@mit.edu>2010-09-19 00:29:05 +0000
committerEdward Z. Yang <ezyang@mit.edu>2010-09-19 00:29:05 +0000
commit83d563cb9ede0ba792836e529b1e2929db926355 (patch)
tree1f9de77ebd24ca7a67894c51442b657d2f265630 /rts
parent9fa96fc44a640014415e1588f50ab7689285e6cb (diff)
downloadhaskell-83d563cb9ede0ba792836e529b1e2929db926355.tar.gz
Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
This is patch that adds support for interruptible FFI calls in the form of a new foreign import keyword 'interruptible', which can be used instead of 'safe' or 'unsafe'. Interruptible FFI calls act like safe FFI calls, except that the worker thread they run on may be interrupted. Internally, it replaces BlockedOnCCall_NoUnblockEx with BlockedOnCCall_Interruptible, and changes the behavior of the RTS to not modify the TSO_ flags on the event of an FFI call from a thread that was interruptible. It also modifies the bytecode format for foreign call, adding an extra Word16 to indicate interruptibility. The semantics of interruption vary from platform to platform, but the intent is that any blocking system calls are aborted with an error code. This is most useful for making function calls to system library functions that support interrupting. There is no support for pre-Vista Windows. There is a partner testsuite patch which adds several tests for this functionality.
Diffstat (limited to 'rts')
-rw-r--r--rts/Interpreter.c3
-rw-r--r--rts/RaiseAsync.c25
-rw-r--r--rts/Schedule.c23
-rw-r--r--rts/Task.c9
-rw-r--r--rts/Task.h5
-rw-r--r--rts/Threads.c4
-rw-r--r--rts/posix/OSThreads.c10
-rw-r--r--rts/sm/MarkWeak.c4
-rw-r--r--rts/win32/OSThreads.c19
9 files changed, 85 insertions, 17 deletions
diff --git a/rts/Interpreter.c b/rts/Interpreter.c
index 9a38a7ed18..da7ee2196a 100644
--- a/rts/Interpreter.c
+++ b/rts/Interpreter.c
@@ -1356,6 +1356,7 @@ run_BCO:
void *tok;
int stk_offset = BCO_NEXT;
int o_itbl = BCO_NEXT;
+ int interruptible = BCO_NEXT;
void(*marshall_fn)(void*) = (void (*)(void*))BCO_LIT(o_itbl);
int ret_dyn_size =
RET_DYN_BITMAP_SIZE + RET_DYN_NONPTR_REGS_SIZE
@@ -1444,7 +1445,7 @@ run_BCO:
((StgRetDyn *)Sp)->payload[0] = (StgClosure *)obj;
SAVE_STACK_POINTERS;
- tok = suspendThread(&cap->r);
+ tok = suspendThread(&cap->r, interruptible ? rtsTrue : rtsFalse);
// We already made a copy of the arguments above.
ffi_call(cif, fn, ret, argptrs);
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index ad830cf322..b94ccea283 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -127,7 +127,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
Capability, and it is
- NotBlocked, BlockedOnMsgThrowTo,
- BlockedOnCCall
+ BlockedOnCCall_Interruptible
- or it is masking exceptions (TSO_BLOCKEX)
@@ -392,8 +392,29 @@ check_target:
return THROWTO_SUCCESS;
}
+ case BlockedOnCCall_Interruptible:
+#ifdef THREADED_RTS
+ {
+ Task *task = NULL;
+ // walk suspended_ccalls to find the correct worker thread
+ InCall *incall;
+ for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
+ if (incall->suspended_tso == target) {
+ task = incall->task;
+ break;
+ }
+ }
+ if (task != NULL) {
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ interruptWorkerTask(task);
+ return THROWTO_SUCCESS;
+ } else {
+ debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
+ }
+ // fall to next
+ }
+#endif
case BlockedOnCCall:
- case BlockedOnCCall_NoUnblockExc:
blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 8db125da74..0850749b36 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -1716,13 +1716,17 @@ recoverSuspendedTask (Capability *cap, Task *task)
* the whole system.
*
* The Haskell thread making the C call is put to sleep for the
- * duration of the call, on the susepended_ccalling_threads queue. We
+ * duration of the call, on the suspended_ccalling_threads queue. We
* give out a token to the task, which it can use to resume the thread
* on return from the C function.
+ *
+ * If this is an interruptible C call, this means that the FFI call may be
+ * unceremoniously terminated and should be scheduled on an
+ * unbound worker thread.
* ------------------------------------------------------------------------- */
void *
-suspendThread (StgRegTable *reg)
+suspendThread (StgRegTable *reg, rtsBool interruptible)
{
Capability *cap;
int saved_errno;
@@ -1751,12 +1755,10 @@ suspendThread (StgRegTable *reg)
threadPaused(cap,tso);
- if ((tso->flags & TSO_BLOCKEX) == 0) {
- tso->why_blocked = BlockedOnCCall;
- tso->flags |= TSO_BLOCKEX;
- tso->flags &= ~TSO_INTERRUPTIBLE;
+ if (interruptible) {
+ tso->why_blocked = BlockedOnCCall_Interruptible;
} else {
- tso->why_blocked = BlockedOnCCall_NoUnblockExc;
+ tso->why_blocked = BlockedOnCCall;
}
// Hand back capability
@@ -1815,12 +1817,11 @@ resumeThread (void *task_)
traceEventRunThread(cap, tso);
- if (tso->why_blocked == BlockedOnCCall) {
+ if ((tso->flags & TSO_BLOCKEX) == 0) {
// avoid locking the TSO if we don't have to
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
awakenBlockedExceptionQueue(cap,tso);
}
- tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
}
/* Reset blocking status */
@@ -2331,7 +2332,7 @@ deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
// we must own all Capabilities.
if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ tso->why_blocked != BlockedOnCCall_Interruptible) {
throwToSingleThreaded(tso->cap,tso,NULL);
}
}
@@ -2343,7 +2344,7 @@ deleteThread_(Capability *cap, StgTSO *tso)
// like deleteThread(), but we delete threads in foreign calls, too.
if (tso->why_blocked == BlockedOnCCall ||
- tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
+ tso->why_blocked == BlockedOnCCall_Interruptible) {
tso->what_next = ThreadKilled;
appendToRunQueue(tso->cap, tso);
} else {
diff --git a/rts/Task.c b/rts/Task.c
index e93d60d86f..f26785a1be 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -409,6 +409,15 @@ startWorkerTask (Capability *cap)
RELEASE_LOCK(&task->lock);
}
+void
+interruptWorkerTask (Task *task)
+{
+ ASSERT(osThreadId() != task->id); // seppuku not allowed
+ ASSERT(task->incall->suspended_tso); // use this only for FFI calls
+ interruptOSThread(task->id);
+ debugTrace(DEBUG_sched, "interrupted worker task %lu", task->id);
+}
+
#endif /* THREADED_RTS */
#ifdef DEBUG
diff --git a/rts/Task.h b/rts/Task.h
index 566c0425cd..38e4763b5a 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -225,6 +225,11 @@ INLINE_HEADER Task *myTask (void);
//
void startWorkerTask (Capability *cap);
+// Interrupts a worker task that is performing an FFI call. The thread
+// should not be destroyed.
+//
+void interruptWorkerTask (Task *task);
+
#endif /* THREADED_RTS */
// -----------------------------------------------------------------------------
diff --git a/rts/Threads.c b/rts/Threads.c
index 6635ed51e5..7344134a7d 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -492,8 +492,8 @@ printThreadBlockage(StgTSO *tso)
case BlockedOnCCall:
debugBelch("is blocked on an external call");
break;
- case BlockedOnCCall_NoUnblockExc:
- debugBelch("is blocked on an external call (exceptions were already blocked)");
+ case BlockedOnCCall_Interruptible:
+ debugBelch("is blocked on an external call (but may be interrupted)");
break;
case BlockedOnSTM:
debugBelch("is blocked on an STM operation");
diff --git a/rts/posix/OSThreads.c b/rts/posix/OSThreads.c
index 343536e063..283155345c 100644
--- a/rts/posix/OSThreads.c
+++ b/rts/posix/OSThreads.c
@@ -57,6 +57,10 @@
#include <mach/mach.h>
#endif
+#ifdef HAVE_SIGNAL_H
+# include <signal.h>
+#endif
+
/*
* This (allegedly) OS threads independent layer was initially
* abstracted away from code that used Pthreads, so the functions
@@ -290,6 +294,12 @@ setThreadAffinity (nat n GNUC3_ATTRIBUTE(__unused__),
}
#endif
+void
+interruptOSThread (OSThreadId id)
+{
+ pthread_kill(id, SIGPIPE);
+}
+
#else /* !defined(THREADED_RTS) */
int
diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c
index e65c176c0a..d4d708e72c 100644
--- a/rts/sm/MarkWeak.c
+++ b/rts/sm/MarkWeak.c
@@ -270,8 +270,10 @@ static rtsBool tidyThreadList (generation *gen)
// if the thread is not masking exceptions but there are
// pending exceptions on its queue, then something has gone
- // wrong:
+ // wrong. However, pending exceptions are OK if there is an
+ // uninterruptible FFI call.
ASSERT(t->blocked_exceptions == END_BLOCKED_EXCEPTIONS_QUEUE
+ || t->why_blocked == BlockedOnCCall
|| (t->flags & TSO_BLOCKEX));
if (tmp == NULL) {
diff --git a/rts/win32/OSThreads.c b/rts/win32/OSThreads.c
index cb00bd602d..44db42fef4 100644
--- a/rts/win32/OSThreads.c
+++ b/rts/win32/OSThreads.c
@@ -269,6 +269,25 @@ setThreadAffinity (nat n, nat m) // cap N of M
}
}
+typedef BOOL (WINAPI *PCSIO)(HANDLE);
+
+void
+interruptOSThread (OSThreadId id)
+{
+ HANDLE hdl;
+ PCSIO pCSIO;
+ if (!(hdl = OpenThread(THREAD_TERMINATE,FALSE,id))) {
+ sysErrorBelch("interruptOSThread: OpenThread");
+ stg_exit(EXIT_FAILURE);
+ }
+ pCSIO = (PCSIO) GetProcAddress(GetModuleHandle(TEXT("Kernel32.dll")), "CancelSynchronousIo");
+ if ( NULL != pCSIO ) {
+ pCSIO(hdl);
+ } else {
+ // Nothing to do, unfortunately
+ }
+}
+
#else /* !defined(THREADED_RTS) */
int