diff options
author | David Eichmann <EichmannD@gmail.com> | 2020-10-19 17:03:41 +0100 |
---|---|---|
committer | Marge Bot <ben+marge-bot@smart-cactus.org> | 2020-11-02 23:46:19 -0500 |
commit | 81006a06736c7300626f9d692a118b493b585cd5 (patch) | |
tree | ef2e3ee8d4e2a3f21ab2f266edd4317abf6747f2 /rts | |
parent | 0b7722219ffdb109c3a8b034a8e112d18e6e4336 (diff) | |
download | haskell-81006a06736c7300626f9d692a118b493b585cd5.tar.gz |
RtsAPI: pause and resume the RTS
The `rts_pause` and `rts_resume` functions have been added to `RtsAPI.h` and
allow an external process to completely pause and resume the RTS.
Co-authored-by: Sven Tennie <sven.tennie@gmail.com>
Co-authored-by: Matthew Pickering <matthewtpickering@gmail.com>
Co-authored-by: Ben Gamari <bgamari.foss@gmail.com>
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Capability.c | 10 | ||||
-rw-r--r-- | rts/RtsAPI.c | 184 | ||||
-rw-r--r-- | rts/Schedule.c | 33 | ||||
-rw-r--r-- | rts/Task.c | 13 | ||||
-rw-r--r-- | rts/Task.h | 17 | ||||
-rw-r--r-- | rts/sm/NonMoving.c | 2 |
6 files changed, 226 insertions, 33 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index aedce0dd8e..8dddce7028 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -858,7 +858,15 @@ void waitForCapability (Capability **pCap, Task *task) /* See Note [GC livelock] in Schedule.c for why we have gcAllowed and return the bool */ bool /* Did we GC? */ -yieldCapability (Capability** pCap, Task *task, bool gcAllowed) +yieldCapability + ( Capability** pCap // [in/out] Task's owned capability. Set to the + // newly owned capability on return. + // Precondition: + // pCap != NULL + // && *pCap != NULL + , Task *task // [in] This thread's task. + , bool gcAllowed + ) { Capability *cap = *pCap; diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c index 1d8e0bc1c8..bf58f53735 100644 --- a/rts/RtsAPI.c +++ b/rts/RtsAPI.c @@ -423,6 +423,10 @@ createStrictIOThread(Capability *cap, W_ stack_size, StgClosure *closure) /* ---------------------------------------------------------------------------- Evaluating Haskell expressions + + The running task (capability->running_task) must be bounded i.e. you must + call newBoundTask() before calling these functions. Note that rts_lock() and + rts_pause() both call newBoundTask(). ------------------------------------------------------------------------- */ void rts_eval (/* inout */ Capability **cap, @@ -597,12 +601,23 @@ rts_getSchedStatus (Capability *cap) return cap->running_task->incall->rstat; } +#if defined(THREADED_RTS) +// The task that paused the RTS. The rts_pausing_task variable is owned by the +// task that owns all capabilities (there is at most one such task). +// +// It's possible to remove this and instead define the pausing task as whichever +// task owns all capabilities, but using `rts_pausing_task` leads to marginally +// cleaner code/API and better error messages. +Task * rts_pausing_task = NULL; +#endif + Capability * rts_lock (void) { Capability *cap; Task *task; + // Bound the current task. This is necessary to support rts_eval* functions. task = newBoundTask(); if (task->running_finalizers) { @@ -613,6 +628,14 @@ rts_lock (void) stg_exit(EXIT_FAILURE); } +#if defined(THREADED_RTS) + if (rts_pausing_task == task) { + errorBelch("error: rts_lock: The RTS is already paused by this thread.\n" + " There is no need to call rts_lock if you have already called rts_pause."); + stg_exit(EXIT_FAILURE); + } +#endif + cap = NULL; waitForCapability(&cap, task); @@ -640,21 +663,21 @@ rts_unlock (Capability *cap) task = cap->running_task; ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - // Now release the Capability. With the capability released, GC - // may happen. NB. does not try to put the current Task on the + // Now release the Capability. With the capability released, GC + // may happen. NB. does not try to put the current Task on the // worker queue. - // NB. keep cap->lock held while we call boundTaskExiting(). This + // NB. keep cap->lock held while we call exitMyTask(). This // is necessary during shutdown, where we want the invariant that // after shutdownCapability(), all the Tasks associated with the - // Capability have completed their shutdown too. Otherwise we - // could have boundTaskExiting()/workerTaskStop() running at some + // Capability have completed their shutdown too. Otherwise we + // could have exitMyTask()/workerTaskStop() running at some // random point in the future, which causes problems for // freeTaskManager(). ACQUIRE_LOCK(&cap->lock); releaseCapability_(cap,false); // Finally, we can release the Task to the free list. - boundTaskExiting(task); + exitMyTask(); RELEASE_LOCK(&cap->lock); if (task->incall == NULL) { @@ -665,6 +688,153 @@ rts_unlock (Capability *cap) } } +struct PauseToken_ { + Capability *capability; +}; + +Capability *pauseTokenCapability(PauseToken *pauseToken) { + return pauseToken->capability; +} + +#if defined(THREADED_RTS) + +// See Note [Locking and Pausing the RTS] +PauseToken *rts_pause (void) +{ + // It is an error if this thread already paused the RTS. If another + // thread has paused the RTS, then rts_pause will block until rts_resume is + // called (and compete with other threads calling rts_pause). The blocking + // behavior is implied by the use of `stopAllCapabilities`. + Task * task = getMyTask(); + if (rts_pausing_task == task) + { + // This task already pased the RTS. + errorBelch("error: rts_pause: This thread has already paused the RTS."); + stg_exit(EXIT_FAILURE); + } + + // The current task must not own a capability. This is true for non-worker + // threads e.g. when making a safe FFI call. We allow pausing when + // `task->cap->running_task != task` because the capability can be taken by + // other capabilities. Doing this check is justified because rts_pause is a + // user facing function and we want good error reporting. We also don't + // expect rts_pause to be performance critical. + if (task->cap && task->cap->running_task == task) + { + // This task owns a capability (and it can't be taken by other capabilities). + errorBelch(task->cap->in_haskell + ? ("error: rts_pause: attempting to pause via an unsafe FFI call.\n" + " Perhaps a 'foreign import unsafe' should be 'safe'?") + : ("error: rts_pause: attempting to pause from a Task that owns a capability.\n" + " Have you already acquired a capability e.g. with rts_lock?")); + stg_exit(EXIT_FAILURE); + } + + // Bound the current task. This is necessary to support rts_eval* functions. + task = newBoundTask(); + stopAllCapabilities(NULL, task); + + // Now we own all capabilities so we own rts_pausing_task and may set it. + rts_pausing_task = task; + + PauseToken *token = malloc(sizeof(PauseToken)); + token->capability = task->cap; + return token; +} + +static void assert_isPausedOnMyTask(const char *functionName); + +// See Note [Locking and Pausing the RTS]. The pauseToken argument is here just +// for symmetry with rts_pause and to match the pattern of rts_lock/rts_unlock. +void rts_resume (PauseToken *pauseToken) +{ + assert_isPausedOnMyTask("rts_resume"); + Task * task = getMyTask(); + + // Now we own all capabilities so we own rts_pausing_task and may write to + // it. + rts_pausing_task = NULL; + + // releaseAllCapabilities will not block because the current task owns all + // capabilities. + releaseAllCapabilities(n_capabilities, NULL, task); + exitMyTask(); + free(pauseToken); +} + +// See RtsAPI.h +bool rts_isPaused(void) +{ + return rts_pausing_task != NULL; +} + +// Check that the rts_pause was called on this thread/task and this thread owns +// all capabilities. If not, outputs an error and exits with EXIT_FAILURE. +static void assert_isPausedOnMyTask(const char *functionName) +{ + Task * task = getMyTask(); + if (rts_pausing_task == NULL) + { + errorBelch ( + "error: %s: the rts is not paused. Did you forget to call rts_pause?", + functionName); + stg_exit(EXIT_FAILURE); + } + + if (task != rts_pausing_task) + { + // We don't have ownership of rts_pausing_task, so it may have changed + // just after the above read. Still, we are garanteed that + // rts_pausing_task won't be set to the current task (because the + // current task is here now!), so the error messages are still correct. + errorBelch ( + "error: %s: called from a different OS thread than rts_pause.", + functionName); + + stg_exit(EXIT_FAILURE); + } + + // Check that we own all capabilities. + for (unsigned int i = 0; i < n_capabilities; i++) + { + Capability *cap = capabilities[i]; + if (cap->running_task != task) + { + errorBelch ( + "error: %s: the pausing thread does not own all capabilities.\n" + " Have you manually released a capability after calling rts_pause?", + functionName); + stg_exit(EXIT_FAILURE); + } + } +} + + +#else +PauseToken GNU_ATTRIBUTE(__noreturn__) +*rts_pause (void) +{ + errorBelch("Warning: Pausing the RTS is only possible for " + "multithreaded RTS."); + stg_exit(EXIT_FAILURE); +} + +void GNU_ATTRIBUTE(__noreturn__) +rts_resume (PauseToken *pauseToken STG_UNUSED) +{ + errorBelch("Warning: Resuming the RTS is only possible for " + "multithreaded RTS."); + stg_exit(EXIT_FAILURE); +} + +bool rts_isPaused() +{ + errorBelch("Warning: Pausing/Resuming the RTS is only possible for " + "multithreaded RTS."); + return false; +} +#endif + void rts_done (void) { freeMyTask(); @@ -700,7 +870,7 @@ void rts_done (void) void hs_try_putmvar (/* in */ int capability, /* in */ HsStablePtr mvar) { - Task *task = getTask(); + Task *task = getMyTask(); Capability *cap; Capability *task_old_cap USED_IF_THREADS; diff --git a/rts/Schedule.c b/rts/Schedule.c index 41d0dba953..75a6f545ec 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -1394,7 +1394,15 @@ scheduleNeedHeapProfile( bool ready_to_gc ) * -------------------------------------------------------------------------- */ #if defined(THREADED_RTS) -void stopAllCapabilities (Capability **pCap, Task *task) +void stopAllCapabilities + ( Capability **pCap // [in/out] This thread's task's owned capability. + // pCap may be NULL if no capability is owned. + // Else *pCap != NULL + // On return, set to the task's newly owned + // capability (task->cap). Though, the Task will + // technically own all capabilities. + , Task *task // [in] This thread's task. + ) { stopAllCapabilitiesWith(pCap, task, SYNC_OTHER); } @@ -1446,9 +1454,16 @@ void stopAllCapabilitiesWith (Capability **pCap, Task *task, SyncType sync_type) * -------------------------------------------------------------------------- */ #if defined(THREADED_RTS) -static bool requestSync ( - Capability **pcap, Task *task, PendingSync *new_sync, - SyncType *prev_sync_type) +static bool requestSync + ( Capability **pcap // [in/out] This thread's task's owned capability. + // May change if there is an existing sync (true is returned). + // Precondition: + // pcap may be NULL + // *pcap != NULL + , Task *task // [in] This thread's task. + , PendingSync *new_sync // [in] The new requested sync. + , SyncType *prev_sync_type // [out] Only set if there is an existing sync (true is returned). + ) { PendingSync *sync; @@ -1542,7 +1557,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task) void releaseAllCapabilities(uint32_t n, Capability *keep_cap, Task *task) { uint32_t i; - + ASSERT( task != NULL); for (i = 0; i < n; i++) { Capability *tmpcap = capabilities[i]; if (keep_cap != tmpcap) { @@ -2065,7 +2080,7 @@ forkProcess(HsStablePtr *entry RELEASE_LOCK(&capabilities[i]->lock); } - boundTaskExiting(task); + exitMyTask(); // just return the pid return pid; @@ -2584,6 +2599,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) #endif } +// See includes/rts/Threads.h void scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap) { @@ -2610,6 +2626,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap) DEBUG_ONLY( id = tso->id ); debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id); + // As the TSO is bound and on the run queue, schedule() will run the TSO. cap = schedule(cap,task); ASSERT(task->incall->rstat != NoStatus); @@ -2745,7 +2762,7 @@ exitScheduler (bool wait_foreign USED_IF_THREADS) // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n", // n_failed_trygrab_idles, n_idle_caps); - boundTaskExiting(task); + exitMyTask(); } void @@ -2804,7 +2821,7 @@ performGC_(bool force_major) waitForCapability(&cap,task); scheduleDoGC(&cap,task,force_major,false); releaseCapability(cap); - boundTaskExiting(task); + exitMyTask(); } void diff --git a/rts/Task.c b/rts/Task.c index 11ba5f1581..e5963dccc6 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -118,7 +118,7 @@ freeTaskManager (void) return tasksRunning; } -Task* getTask (void) +Task* getMyTask (void) { Task *task; @@ -306,7 +306,7 @@ newBoundTask (void) stg_exit(EXIT_FAILURE); } - task = getTask(); + task = getMyTask(); task->stopped = false; @@ -317,13 +317,12 @@ newBoundTask (void) } void -boundTaskExiting (Task *task) +exitMyTask (void) { + Task* task = myTask(); #if defined(THREADED_RTS) ASSERT(osThreadId() == task->id); #endif - ASSERT(myTask() == task); - endInCall(task); // Set task->stopped, but only if this is the last call (#4850). @@ -524,7 +523,7 @@ void rts_setInCallCapability ( int preferred_capability, int affinity USED_IF_THREADS) { - Task *task = getTask(); + Task *task = getMyTask(); task->preferred_capability = preferred_capability; #if defined(THREADED_RTS) @@ -541,7 +540,7 @@ void rts_pinThreadToNumaNode ( { #if defined(THREADED_RTS) if (RtsFlags.GcFlags.numa) { - Task *task = getTask(); + Task *task = getMyTask(); task->node = capNoToNumaNode(node); if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA setThreadNode(numa_map[task->node]); diff --git a/rts/Task.h b/rts/Task.h index 17bcbe2da4..6e366a5d9b 100644 --- a/rts/Task.h +++ b/rts/Task.h @@ -149,8 +149,8 @@ typedef struct Task_ { struct InCall_ *spare_incalls; bool worker; // == true if this is a worker Task - bool stopped; // == true between newBoundTask and - // boundTaskExiting, or in a worker Task. + bool stopped; // == false between newBoundTask and + // exitMyTask, or in a worker Task. // So that we can detect when a finalizer illegally calls back into Haskell bool running_finalizers; @@ -200,9 +200,9 @@ extern Mutex all_tasks_mutex; void initTaskManager (void); uint32_t freeTaskManager (void); -// Create a new Task for a bound thread. This Task must be released -// by calling boundTaskExiting. The Task is cached in -// thread-local storage and will remain even after boundTaskExiting() +// Create a new Task for a bound thread. This Task must be released +// by calling exitMyTask(). The Task is cached in +// thread-local storage and will remain even after exitMyTask() // has been called; to free the memory, see freeMyTask(). // Task* newBoundTask (void); @@ -210,11 +210,10 @@ Task* newBoundTask (void); // Return the current OS thread's Task, which is created if it doesn't already // exist. After you have finished using RTS APIs, you should call freeMyTask() // to release this thread's Task. -Task* getTask (void); +Task* getMyTask (void); -// The current task is a bound task that is exiting. -// -void boundTaskExiting (Task *task); +// Exit myTask - This is the counterpart of newBoundTask(). +void exitMyTask (void); // Free a Task if one was previously allocated by newBoundTask(). // This is not necessary unless the thread that called newBoundTask() diff --git a/rts/sm/NonMoving.c b/rts/sm/NonMoving.c index 388ceae2fd..05f8481fe2 100644 --- a/rts/sm/NonMoving.c +++ b/rts/sm/NonMoving.c @@ -1215,7 +1215,7 @@ static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO * #if defined(THREADED_RTS) finish: - boundTaskExiting(task); + exitMyTask(); // We are done... mark_thread = 0; |