diff options
author | Ben Gamari <ben@smart-cactus.org> | 2020-12-18 10:14:09 -0500 |
---|---|---|
committer | Marge Bot <ben+marge-bot@smart-cactus.org> | 2021-03-02 17:29:05 -0500 |
commit | 8188adf0f1a0482c269d1eb6350dd91dddc9ed29 (patch) | |
tree | 89d265d5212f9c5dcbf4db0ac0ab42234d1eb164 | |
parent | 5d7978df2f23ba125adacbe371188f99debdbb43 (diff) | |
download | haskell-8188adf0f1a0482c269d1eb6350dd91dddc9ed29.tar.gz |
eventlog: Fix various races
Previously the eventlog infrastructure had a couple of races that could
pop up when using the startEventLog/endEventLog interfaces. In
particular, stopping and then later restarting logging could result in
data preceding the eventlog header, breaking the integrity of the
stream.
To fix this we rework the invariants regarding the eventlog and
generally tighten up the concurrency control surrounding starting and
stopping of logging.
We also fix an unrelated bug, wherein log events from disabled
capabilities could end up never flushed.
-rw-r--r-- | rts/RtsStartup.c | 4 | ||||
-rw-r--r-- | rts/Trace.h | 14 | ||||
-rw-r--r-- | rts/eventlog/EventLog.c | 134 | ||||
-rw-r--r-- | rts/eventlog/EventLog.h | 3 | ||||
-rw-r--r-- | testsuite/tests/rts/InitEventLogging.stdout | 1 |
5 files changed, 136 insertions, 20 deletions
diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c index 818fbaa346..bd8e5d5733 100644 --- a/rts/RtsStartup.c +++ b/rts/RtsStartup.c @@ -507,6 +507,10 @@ hs_exit_(bool wait_foreign) // also outputs the stats (+RTS -s) info. exitStorage(); + /* flush and clean up capabilities' eventlog buffers before cleaning up + * scheduler */ + finishCapEventLogging(); + /* free the tasks */ freeScheduler(); diff --git a/rts/Trace.h b/rts/Trace.h index cad83363d3..08b42fe9bd 100644 --- a/rts/Trace.h +++ b/rts/Trace.h @@ -8,7 +8,7 @@ #pragma once -#include "rts/EventLogFormat.h" +#include "eventlog/EventLog.h" #include "sm/NonMovingCensus.h" #include "Capability.h" @@ -616,6 +616,18 @@ INLINE_HEADER void traceCapDisable(Capability *cap STG_UNUSED) { traceCapEvent(cap, EVENT_CAP_DISABLE); dtraceCapDisable((EventCapNo)cap->no); + + // Ensure that the eventlog buffer is flushed since otherwise its events + // may never make it to the output stream. + // See Note [Eventlog concurrency]. +#if defined(TRACING) + if (eventlog_enabled) { + flushLocalEventsBuf(cap); + } +# else + flushLocalEventsBuf(cap); +#endif + } INLINE_HEADER void traceEventThreadWakeup(Capability *cap STG_UNUSED, diff --git a/rts/eventlog/EventLog.c b/rts/eventlog/EventLog.c index ddd2cc02df..876f1ff7b6 100644 --- a/rts/eventlog/EventLog.c +++ b/rts/eventlog/EventLog.c @@ -27,7 +27,65 @@ #include <unistd.h> #endif -bool eventlog_enabled; +Mutex state_change_mutex; +bool eventlog_enabled; // protected by state_change_mutex to ensure + // serialisation of calls to + // startEventLogging/endEventLogging + +/* Note [Eventlog concurrency] + * ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + * The eventlog is designed to handle high rates of concurrent event posting by + * multiple capabilities. For this reason, each capability has its own local + * event buffer, which is flushed when filled. + * + * Additionally, there is a single global event buffer (`eventBuf`), which is + * used for various administrative things (e.g. posting the header) and in + * cases where we want to post events yet don't hold a capability. + * + * Whether or not events are posted is determined by the global flag + * eventlog_enabled. Naturally, starting and stopping of logging are a bit + * subtle. In particular, we need to ensure that the header is the *first* + * thing to appear in the event stream. To ensure that multiple threads don't + * race to start/stop logging we protect eventlog_enabled with + * state_change_mutex. Moreover, we only set eventlog_enabled *after* we have + * posted the header. + * + * Event buffers uphold the invariant that they always begin with a start-block + * marker. This is enforced by calls to postBlockMarker in: + * + * - initEventsBufs (during buffer initialization) + * - printAndClearEventBuf (after the buffer is filled) + * - moreCapEventBufs (when the number of capabilities changes) + * + * The one place where we *don't* want a block marker is when posting the + * eventlog header. We achieve this by first resetting the eventlog buffer + * before posting the header (in postHeader). + * + * Stopping eventlogging is a bit involved: + * + * 1. first take state_change_mutex, to ensure we don't race with another + * thread to stop logging. + * 2. disable eventlog_enabled, to ensure that no capabilities post further + * events + * 3. request that all capabilities flush their eventlog buffers. This + * achieves two things: (a) ensures that all events make it to the output + * stream, and (b) serves as a memory barrier, ensuring that all + * capabilities see that eventlogging is now disabled + * 4. wait until all capabilities have flushed. + * 5. post the end-of-data marker + * 6. stop the writer + * 7. release state_change_mutex + * + * Note that a corrollary of this is that !eventlog_enabled implies that the + * eventlog buffers are all empty (modulo the block marker that all buffers + * always have). + * + * Changing the number of capabilities is fairly straightforward since we hold + * all capabilities when the capability count is changed. The one slight corner + * case is that we must ensure that the buffers of any disabled capabilities are + * flushed, lest their events are stuck in limbo. This is achieved with a call to + * flushLocalEventsBuf in traceCapDisable. + */ static const EventLogWriter *event_log_writer = NULL; @@ -36,6 +94,8 @@ static const EventLogWriter *event_log_writer = NULL; static int flushCount; // Struct for record keeping of buffer to store event types and events. +// +// Invariant: The event buffer will always begin with a block-start marker. typedef struct _EventsBuf { StgInt8 *begin; StgInt8 *pos; @@ -512,6 +572,10 @@ init_event_types(void) static void postHeaderEvents(void) { + // The header must appear first in the output stream, without the + // the block start marker we previously added in printAndClearEventBuf. + resetEventsBuf(&eventBuf); + // Write in buffer: the header begin marker. postInt32(&eventBuf, EVENT_HEADER_BEGIN); @@ -571,6 +635,7 @@ initEventLogging() initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1)); #if defined(THREADED_RTS) initMutex(&eventBufMutex); + initMutex(&state_change_mutex); #endif } @@ -589,31 +654,39 @@ startEventLogging_(void) { initEventLogWriter(); + ACQUIRE_LOCK(&eventBufMutex); postHeaderEvents(); - // Flush capEventBuf with header. /* * Flush header and data begin marker to the file, thus preparing the * file to have events written to it. */ printAndClearEventBuf(&eventBuf); - for (uint32_t c = 0; c < get_n_capabilities(); ++c) { - postBlockMarker(&capEventBuf[c]); - } + RELEASE_LOCK(&eventBufMutex); + return true; } bool startEventLogging(const EventLogWriter *ev_writer) { + // Fail early if we race with another thread. + if (TRY_ACQUIRE_LOCK(&state_change_mutex) != 0) { + return false; + } + + // Check whether eventlogging has already been enabled. if (eventlog_enabled || event_log_writer) { + RELEASE_LOCK(&state_change_mutex); return false; } - eventlog_enabled = true; event_log_writer = ev_writer; - return startEventLogging_(); + bool ret = startEventLogging_(); + eventlog_enabled = true; + RELEASE_LOCK(&state_change_mutex); + return ret; } // Called during forkProcess in the child to restart the eventlog writer. @@ -628,18 +701,42 @@ restartEventLogging(void) } } +// Flush and free capability eventlog buffers in preparation for RTS shutdown. +void +finishCapEventLogging(void) +{ + if (eventlog_enabled) { + // Flush all events remaining in the capabilities' buffers and free them. + // N.B. at this point we hold all capabilities. + for (uint32_t c = 0; c < n_capabilities; ++c) { + if (capEventBuf[c].begin != NULL) { + printAndClearEventBuf(&capEventBuf[c]); + stgFree(capEventBuf[c].begin); + } + } + } +} + void endEventLogging(void) { - if (!eventlog_enabled) + ACQUIRE_LOCK(&state_change_mutex); + if (!eventlog_enabled) { + RELEASE_LOCK(&state_change_mutex); return; + } + + eventlog_enabled = false; // Flush all events remaining in the buffers. - for (uint32_t c = 0; c < n_capabilities; ++c) { - printAndClearEventBuf(&capEventBuf[c]); + // + // N.B. Don't flush if shutting down: this was done in + // finishCapEventLogging and the capabilities have already been freed. + if (sched_state != SCHED_SHUTTING_DOWN) { + flushEventLog(NULL); } - printAndClearEventBuf(&eventBuf); - resetEventsBuf(&eventBuf); // we don't want the block marker + + ACQUIRE_LOCK(&eventBufMutex); // Mark end of events (data). postEventTypeNum(&eventBuf, EVENT_DATA_END); @@ -647,11 +744,15 @@ endEventLogging(void) // Flush the end of data marker. printAndClearEventBuf(&eventBuf); + RELEASE_LOCK(&eventBufMutex); + stopEventLogWriter(); event_log_writer = NULL; - eventlog_enabled = false; + + RELEASE_LOCK(&state_change_mutex); } +/* N.B. we hold all capabilities when this is called */ void moreCapEventBufs (uint32_t from, uint32_t to) { @@ -663,6 +764,7 @@ moreCapEventBufs (uint32_t from, uint32_t to) "moreCapEventBufs"); } + // Initialize buffers for new capabilities for (uint32_t c = from; c < to; ++c) { initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c); } @@ -680,11 +782,6 @@ moreCapEventBufs (uint32_t from, uint32_t to) void freeEventLogging(void) { - // Free events buffer. - for (uint32_t c = 0; c < n_capabilities; ++c) { - if (capEventBuf[c].begin != NULL) - stgFree(capEventBuf[c].begin); - } if (capEventBuf != NULL) { stgFree(capEventBuf); } @@ -1571,6 +1668,7 @@ void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno) eb->size = size; eb->marker = NULL; eb->capno = capno; + postBlockMarker(eb); } void resetEventsBuf(EventsBuf* eb) diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h index e78db809c7..a412b491bb 100644 --- a/rts/eventlog/EventLog.h +++ b/rts/eventlog/EventLog.h @@ -26,6 +26,7 @@ extern bool eventlog_enabled; void initEventLogging(void); void restartEventLogging(void); +void finishCapEventLogging(void); void freeEventLogging(void); void abortEventLogging(void); // #4512 - after fork child needs to abort void moreCapEventBufs (uint32_t from, uint32_t to); @@ -182,6 +183,8 @@ void postTickyCounterSamples(StgEntCounter *p); #else /* !TRACING */ +INLINE_HEADER void finishCapEventLogging(void) {} + INLINE_HEADER void flushLocalEventsBuf(Capability *cap STG_UNUSED) { /* nothing */ } diff --git a/testsuite/tests/rts/InitEventLogging.stdout b/testsuite/tests/rts/InitEventLogging.stdout index ef5ce6f2d3..2729aa1c32 100644 --- a/testsuite/tests/rts/InitEventLogging.stdout +++ b/testsuite/tests/rts/InitEventLogging.stdout @@ -3,5 +3,4 @@ init write write write -write stop |