summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorDavid Eichmann <EichmannD@gmail.com>2020-10-19 17:03:41 +0100
committerMarge Bot <ben+marge-bot@smart-cactus.org>2020-11-02 23:46:19 -0500
commit81006a06736c7300626f9d692a118b493b585cd5 (patch)
treeef2e3ee8d4e2a3f21ab2f266edd4317abf6747f2 /rts
parent0b7722219ffdb109c3a8b034a8e112d18e6e4336 (diff)
downloadhaskell-81006a06736c7300626f9d692a118b493b585cd5.tar.gz
RtsAPI: pause and resume the RTS
The `rts_pause` and `rts_resume` functions have been added to `RtsAPI.h` and allow an external process to completely pause and resume the RTS. Co-authored-by: Sven Tennie <sven.tennie@gmail.com> Co-authored-by: Matthew Pickering <matthewtpickering@gmail.com> Co-authored-by: Ben Gamari <bgamari.foss@gmail.com>
Diffstat (limited to 'rts')
-rw-r--r--rts/Capability.c10
-rw-r--r--rts/RtsAPI.c184
-rw-r--r--rts/Schedule.c33
-rw-r--r--rts/Task.c13
-rw-r--r--rts/Task.h17
-rw-r--r--rts/sm/NonMoving.c2
6 files changed, 226 insertions, 33 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index aedce0dd8e..8dddce7028 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -858,7 +858,15 @@ void waitForCapability (Capability **pCap, Task *task)
/* See Note [GC livelock] in Schedule.c for why we have gcAllowed
and return the bool */
bool /* Did we GC? */
-yieldCapability (Capability** pCap, Task *task, bool gcAllowed)
+yieldCapability
+ ( Capability** pCap // [in/out] Task's owned capability. Set to the
+ // newly owned capability on return.
+ // Precondition:
+ // pCap != NULL
+ // && *pCap != NULL
+ , Task *task // [in] This thread's task.
+ , bool gcAllowed
+ )
{
Capability *cap = *pCap;
diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c
index 1d8e0bc1c8..bf58f53735 100644
--- a/rts/RtsAPI.c
+++ b/rts/RtsAPI.c
@@ -423,6 +423,10 @@ createStrictIOThread(Capability *cap, W_ stack_size, StgClosure *closure)
/* ----------------------------------------------------------------------------
Evaluating Haskell expressions
+
+ The running task (capability->running_task) must be bounded i.e. you must
+ call newBoundTask() before calling these functions. Note that rts_lock() and
+ rts_pause() both call newBoundTask().
------------------------------------------------------------------------- */
void rts_eval (/* inout */ Capability **cap,
@@ -597,12 +601,23 @@ rts_getSchedStatus (Capability *cap)
return cap->running_task->incall->rstat;
}
+#if defined(THREADED_RTS)
+// The task that paused the RTS. The rts_pausing_task variable is owned by the
+// task that owns all capabilities (there is at most one such task).
+//
+// It's possible to remove this and instead define the pausing task as whichever
+// task owns all capabilities, but using `rts_pausing_task` leads to marginally
+// cleaner code/API and better error messages.
+Task * rts_pausing_task = NULL;
+#endif
+
Capability *
rts_lock (void)
{
Capability *cap;
Task *task;
+ // Bound the current task. This is necessary to support rts_eval* functions.
task = newBoundTask();
if (task->running_finalizers) {
@@ -613,6 +628,14 @@ rts_lock (void)
stg_exit(EXIT_FAILURE);
}
+#if defined(THREADED_RTS)
+ if (rts_pausing_task == task) {
+ errorBelch("error: rts_lock: The RTS is already paused by this thread.\n"
+ " There is no need to call rts_lock if you have already called rts_pause.");
+ stg_exit(EXIT_FAILURE);
+ }
+#endif
+
cap = NULL;
waitForCapability(&cap, task);
@@ -640,21 +663,21 @@ rts_unlock (Capability *cap)
task = cap->running_task;
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- // Now release the Capability. With the capability released, GC
- // may happen. NB. does not try to put the current Task on the
+ // Now release the Capability. With the capability released, GC
+ // may happen. NB. does not try to put the current Task on the
// worker queue.
- // NB. keep cap->lock held while we call boundTaskExiting(). This
+ // NB. keep cap->lock held while we call exitMyTask(). This
// is necessary during shutdown, where we want the invariant that
// after shutdownCapability(), all the Tasks associated with the
- // Capability have completed their shutdown too. Otherwise we
- // could have boundTaskExiting()/workerTaskStop() running at some
+ // Capability have completed their shutdown too. Otherwise we
+ // could have exitMyTask()/workerTaskStop() running at some
// random point in the future, which causes problems for
// freeTaskManager().
ACQUIRE_LOCK(&cap->lock);
releaseCapability_(cap,false);
// Finally, we can release the Task to the free list.
- boundTaskExiting(task);
+ exitMyTask();
RELEASE_LOCK(&cap->lock);
if (task->incall == NULL) {
@@ -665,6 +688,153 @@ rts_unlock (Capability *cap)
}
}
+struct PauseToken_ {
+ Capability *capability;
+};
+
+Capability *pauseTokenCapability(PauseToken *pauseToken) {
+ return pauseToken->capability;
+}
+
+#if defined(THREADED_RTS)
+
+// See Note [Locking and Pausing the RTS]
+PauseToken *rts_pause (void)
+{
+ // It is an error if this thread already paused the RTS. If another
+ // thread has paused the RTS, then rts_pause will block until rts_resume is
+ // called (and compete with other threads calling rts_pause). The blocking
+ // behavior is implied by the use of `stopAllCapabilities`.
+ Task * task = getMyTask();
+ if (rts_pausing_task == task)
+ {
+ // This task already pased the RTS.
+ errorBelch("error: rts_pause: This thread has already paused the RTS.");
+ stg_exit(EXIT_FAILURE);
+ }
+
+ // The current task must not own a capability. This is true for non-worker
+ // threads e.g. when making a safe FFI call. We allow pausing when
+ // `task->cap->running_task != task` because the capability can be taken by
+ // other capabilities. Doing this check is justified because rts_pause is a
+ // user facing function and we want good error reporting. We also don't
+ // expect rts_pause to be performance critical.
+ if (task->cap && task->cap->running_task == task)
+ {
+ // This task owns a capability (and it can't be taken by other capabilities).
+ errorBelch(task->cap->in_haskell
+ ? ("error: rts_pause: attempting to pause via an unsafe FFI call.\n"
+ " Perhaps a 'foreign import unsafe' should be 'safe'?")
+ : ("error: rts_pause: attempting to pause from a Task that owns a capability.\n"
+ " Have you already acquired a capability e.g. with rts_lock?"));
+ stg_exit(EXIT_FAILURE);
+ }
+
+ // Bound the current task. This is necessary to support rts_eval* functions.
+ task = newBoundTask();
+ stopAllCapabilities(NULL, task);
+
+ // Now we own all capabilities so we own rts_pausing_task and may set it.
+ rts_pausing_task = task;
+
+ PauseToken *token = malloc(sizeof(PauseToken));
+ token->capability = task->cap;
+ return token;
+}
+
+static void assert_isPausedOnMyTask(const char *functionName);
+
+// See Note [Locking and Pausing the RTS]. The pauseToken argument is here just
+// for symmetry with rts_pause and to match the pattern of rts_lock/rts_unlock.
+void rts_resume (PauseToken *pauseToken)
+{
+ assert_isPausedOnMyTask("rts_resume");
+ Task * task = getMyTask();
+
+ // Now we own all capabilities so we own rts_pausing_task and may write to
+ // it.
+ rts_pausing_task = NULL;
+
+ // releaseAllCapabilities will not block because the current task owns all
+ // capabilities.
+ releaseAllCapabilities(n_capabilities, NULL, task);
+ exitMyTask();
+ free(pauseToken);
+}
+
+// See RtsAPI.h
+bool rts_isPaused(void)
+{
+ return rts_pausing_task != NULL;
+}
+
+// Check that the rts_pause was called on this thread/task and this thread owns
+// all capabilities. If not, outputs an error and exits with EXIT_FAILURE.
+static void assert_isPausedOnMyTask(const char *functionName)
+{
+ Task * task = getMyTask();
+ if (rts_pausing_task == NULL)
+ {
+ errorBelch (
+ "error: %s: the rts is not paused. Did you forget to call rts_pause?",
+ functionName);
+ stg_exit(EXIT_FAILURE);
+ }
+
+ if (task != rts_pausing_task)
+ {
+ // We don't have ownership of rts_pausing_task, so it may have changed
+ // just after the above read. Still, we are garanteed that
+ // rts_pausing_task won't be set to the current task (because the
+ // current task is here now!), so the error messages are still correct.
+ errorBelch (
+ "error: %s: called from a different OS thread than rts_pause.",
+ functionName);
+
+ stg_exit(EXIT_FAILURE);
+ }
+
+ // Check that we own all capabilities.
+ for (unsigned int i = 0; i < n_capabilities; i++)
+ {
+ Capability *cap = capabilities[i];
+ if (cap->running_task != task)
+ {
+ errorBelch (
+ "error: %s: the pausing thread does not own all capabilities.\n"
+ " Have you manually released a capability after calling rts_pause?",
+ functionName);
+ stg_exit(EXIT_FAILURE);
+ }
+ }
+}
+
+
+#else
+PauseToken GNU_ATTRIBUTE(__noreturn__)
+*rts_pause (void)
+{
+ errorBelch("Warning: Pausing the RTS is only possible for "
+ "multithreaded RTS.");
+ stg_exit(EXIT_FAILURE);
+}
+
+void GNU_ATTRIBUTE(__noreturn__)
+rts_resume (PauseToken *pauseToken STG_UNUSED)
+{
+ errorBelch("Warning: Resuming the RTS is only possible for "
+ "multithreaded RTS.");
+ stg_exit(EXIT_FAILURE);
+}
+
+bool rts_isPaused()
+{
+ errorBelch("Warning: Pausing/Resuming the RTS is only possible for "
+ "multithreaded RTS.");
+ return false;
+}
+#endif
+
void rts_done (void)
{
freeMyTask();
@@ -700,7 +870,7 @@ void rts_done (void)
void hs_try_putmvar (/* in */ int capability,
/* in */ HsStablePtr mvar)
{
- Task *task = getTask();
+ Task *task = getMyTask();
Capability *cap;
Capability *task_old_cap USED_IF_THREADS;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 41d0dba953..75a6f545ec 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -1394,7 +1394,15 @@ scheduleNeedHeapProfile( bool ready_to_gc )
* -------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
-void stopAllCapabilities (Capability **pCap, Task *task)
+void stopAllCapabilities
+ ( Capability **pCap // [in/out] This thread's task's owned capability.
+ // pCap may be NULL if no capability is owned.
+ // Else *pCap != NULL
+ // On return, set to the task's newly owned
+ // capability (task->cap). Though, the Task will
+ // technically own all capabilities.
+ , Task *task // [in] This thread's task.
+ )
{
stopAllCapabilitiesWith(pCap, task, SYNC_OTHER);
}
@@ -1446,9 +1454,16 @@ void stopAllCapabilitiesWith (Capability **pCap, Task *task, SyncType sync_type)
* -------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
-static bool requestSync (
- Capability **pcap, Task *task, PendingSync *new_sync,
- SyncType *prev_sync_type)
+static bool requestSync
+ ( Capability **pcap // [in/out] This thread's task's owned capability.
+ // May change if there is an existing sync (true is returned).
+ // Precondition:
+ // pcap may be NULL
+ // *pcap != NULL
+ , Task *task // [in] This thread's task.
+ , PendingSync *new_sync // [in] The new requested sync.
+ , SyncType *prev_sync_type // [out] Only set if there is an existing sync (true is returned).
+ )
{
PendingSync *sync;
@@ -1542,7 +1557,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
void releaseAllCapabilities(uint32_t n, Capability *keep_cap, Task *task)
{
uint32_t i;
-
+ ASSERT( task != NULL);
for (i = 0; i < n; i++) {
Capability *tmpcap = capabilities[i];
if (keep_cap != tmpcap) {
@@ -2065,7 +2080,7 @@ forkProcess(HsStablePtr *entry
RELEASE_LOCK(&capabilities[i]->lock);
}
- boundTaskExiting(task);
+ exitMyTask();
// just return the pid
return pid;
@@ -2584,6 +2599,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
#endif
}
+// See includes/rts/Threads.h
void
scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
{
@@ -2610,6 +2626,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
DEBUG_ONLY( id = tso->id );
debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
+ // As the TSO is bound and on the run queue, schedule() will run the TSO.
cap = schedule(cap,task);
ASSERT(task->incall->rstat != NoStatus);
@@ -2745,7 +2762,7 @@ exitScheduler (bool wait_foreign USED_IF_THREADS)
// debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
// n_failed_trygrab_idles, n_idle_caps);
- boundTaskExiting(task);
+ exitMyTask();
}
void
@@ -2804,7 +2821,7 @@ performGC_(bool force_major)
waitForCapability(&cap,task);
scheduleDoGC(&cap,task,force_major,false);
releaseCapability(cap);
- boundTaskExiting(task);
+ exitMyTask();
}
void
diff --git a/rts/Task.c b/rts/Task.c
index 11ba5f1581..e5963dccc6 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -118,7 +118,7 @@ freeTaskManager (void)
return tasksRunning;
}
-Task* getTask (void)
+Task* getMyTask (void)
{
Task *task;
@@ -306,7 +306,7 @@ newBoundTask (void)
stg_exit(EXIT_FAILURE);
}
- task = getTask();
+ task = getMyTask();
task->stopped = false;
@@ -317,13 +317,12 @@ newBoundTask (void)
}
void
-boundTaskExiting (Task *task)
+exitMyTask (void)
{
+ Task* task = myTask();
#if defined(THREADED_RTS)
ASSERT(osThreadId() == task->id);
#endif
- ASSERT(myTask() == task);
-
endInCall(task);
// Set task->stopped, but only if this is the last call (#4850).
@@ -524,7 +523,7 @@ void rts_setInCallCapability (
int preferred_capability,
int affinity USED_IF_THREADS)
{
- Task *task = getTask();
+ Task *task = getMyTask();
task->preferred_capability = preferred_capability;
#if defined(THREADED_RTS)
@@ -541,7 +540,7 @@ void rts_pinThreadToNumaNode (
{
#if defined(THREADED_RTS)
if (RtsFlags.GcFlags.numa) {
- Task *task = getTask();
+ Task *task = getMyTask();
task->node = capNoToNumaNode(node);
if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
setThreadNode(numa_map[task->node]);
diff --git a/rts/Task.h b/rts/Task.h
index 17bcbe2da4..6e366a5d9b 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -149,8 +149,8 @@ typedef struct Task_ {
struct InCall_ *spare_incalls;
bool worker; // == true if this is a worker Task
- bool stopped; // == true between newBoundTask and
- // boundTaskExiting, or in a worker Task.
+ bool stopped; // == false between newBoundTask and
+ // exitMyTask, or in a worker Task.
// So that we can detect when a finalizer illegally calls back into Haskell
bool running_finalizers;
@@ -200,9 +200,9 @@ extern Mutex all_tasks_mutex;
void initTaskManager (void);
uint32_t freeTaskManager (void);
-// Create a new Task for a bound thread. This Task must be released
-// by calling boundTaskExiting. The Task is cached in
-// thread-local storage and will remain even after boundTaskExiting()
+// Create a new Task for a bound thread. This Task must be released
+// by calling exitMyTask(). The Task is cached in
+// thread-local storage and will remain even after exitMyTask()
// has been called; to free the memory, see freeMyTask().
//
Task* newBoundTask (void);
@@ -210,11 +210,10 @@ Task* newBoundTask (void);
// Return the current OS thread's Task, which is created if it doesn't already
// exist. After you have finished using RTS APIs, you should call freeMyTask()
// to release this thread's Task.
-Task* getTask (void);
+Task* getMyTask (void);
-// The current task is a bound task that is exiting.
-//
-void boundTaskExiting (Task *task);
+// Exit myTask - This is the counterpart of newBoundTask().
+void exitMyTask (void);
// Free a Task if one was previously allocated by newBoundTask().
// This is not necessary unless the thread that called newBoundTask()
diff --git a/rts/sm/NonMoving.c b/rts/sm/NonMoving.c
index 388ceae2fd..05f8481fe2 100644
--- a/rts/sm/NonMoving.c
+++ b/rts/sm/NonMoving.c
@@ -1215,7 +1215,7 @@ static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO *
#if defined(THREADED_RTS)
finish:
- boundTaskExiting(task);
+ exitMyTask();
// We are done...
mark_thread = 0;