diff options
-rw-r--r-- | rts/RtsStartup.c | 4 | ||||
-rw-r--r-- | rts/Trace.h | 14 | ||||
-rw-r--r-- | rts/eventlog/EventLog.c | 134 | ||||
-rw-r--r-- | rts/eventlog/EventLog.h | 3 | ||||
-rw-r--r-- | testsuite/tests/rts/InitEventLogging.stdout | 1 |
5 files changed, 136 insertions, 20 deletions
diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c index 818fbaa346..bd8e5d5733 100644 --- a/rts/RtsStartup.c +++ b/rts/RtsStartup.c @@ -507,6 +507,10 @@ hs_exit_(bool wait_foreign) // also outputs the stats (+RTS -s) info. exitStorage(); + /* flush and clean up capabilities' eventlog buffers before cleaning up + * scheduler */ + finishCapEventLogging(); + /* free the tasks */ freeScheduler(); diff --git a/rts/Trace.h b/rts/Trace.h index cad83363d3..08b42fe9bd 100644 --- a/rts/Trace.h +++ b/rts/Trace.h @@ -8,7 +8,7 @@ #pragma once -#include "rts/EventLogFormat.h" +#include "eventlog/EventLog.h" #include "sm/NonMovingCensus.h" #include "Capability.h" @@ -616,6 +616,18 @@ INLINE_HEADER void traceCapDisable(Capability *cap STG_UNUSED) { traceCapEvent(cap, EVENT_CAP_DISABLE); dtraceCapDisable((EventCapNo)cap->no); + + // Ensure that the eventlog buffer is flushed since otherwise its events + // may never make it to the output stream. + // See Note [Eventlog concurrency]. +#if defined(TRACING) + if (eventlog_enabled) { + flushLocalEventsBuf(cap); + } +# else + flushLocalEventsBuf(cap); +#endif + } INLINE_HEADER void traceEventThreadWakeup(Capability *cap STG_UNUSED, diff --git a/rts/eventlog/EventLog.c b/rts/eventlog/EventLog.c index ddd2cc02df..876f1ff7b6 100644 --- a/rts/eventlog/EventLog.c +++ b/rts/eventlog/EventLog.c @@ -27,7 +27,65 @@ #include <unistd.h> #endif -bool eventlog_enabled; +Mutex state_change_mutex; +bool eventlog_enabled; // protected by state_change_mutex to ensure + // serialisation of calls to + // startEventLogging/endEventLogging + +/* Note [Eventlog concurrency] + * ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + * The eventlog is designed to handle high rates of concurrent event posting by + * multiple capabilities. For this reason, each capability has its own local + * event buffer, which is flushed when filled. + * + * Additionally, there is a single global event buffer (`eventBuf`), which is + * used for various administrative things (e.g. posting the header) and in + * cases where we want to post events yet don't hold a capability. + * + * Whether or not events are posted is determined by the global flag + * eventlog_enabled. Naturally, starting and stopping of logging are a bit + * subtle. In particular, we need to ensure that the header is the *first* + * thing to appear in the event stream. To ensure that multiple threads don't + * race to start/stop logging we protect eventlog_enabled with + * state_change_mutex. Moreover, we only set eventlog_enabled *after* we have + * posted the header. + * + * Event buffers uphold the invariant that they always begin with a start-block + * marker. This is enforced by calls to postBlockMarker in: + * + * - initEventsBufs (during buffer initialization) + * - printAndClearEventBuf (after the buffer is filled) + * - moreCapEventBufs (when the number of capabilities changes) + * + * The one place where we *don't* want a block marker is when posting the + * eventlog header. We achieve this by first resetting the eventlog buffer + * before posting the header (in postHeader). + * + * Stopping eventlogging is a bit involved: + * + * 1. first take state_change_mutex, to ensure we don't race with another + * thread to stop logging. + * 2. disable eventlog_enabled, to ensure that no capabilities post further + * events + * 3. request that all capabilities flush their eventlog buffers. This + * achieves two things: (a) ensures that all events make it to the output + * stream, and (b) serves as a memory barrier, ensuring that all + * capabilities see that eventlogging is now disabled + * 4. wait until all capabilities have flushed. + * 5. post the end-of-data marker + * 6. stop the writer + * 7. release state_change_mutex + * + * Note that a corrollary of this is that !eventlog_enabled implies that the + * eventlog buffers are all empty (modulo the block marker that all buffers + * always have). + * + * Changing the number of capabilities is fairly straightforward since we hold + * all capabilities when the capability count is changed. The one slight corner + * case is that we must ensure that the buffers of any disabled capabilities are + * flushed, lest their events are stuck in limbo. This is achieved with a call to + * flushLocalEventsBuf in traceCapDisable. + */ static const EventLogWriter *event_log_writer = NULL; @@ -36,6 +94,8 @@ static const EventLogWriter *event_log_writer = NULL; static int flushCount; // Struct for record keeping of buffer to store event types and events. +// +// Invariant: The event buffer will always begin with a block-start marker. typedef struct _EventsBuf { StgInt8 *begin; StgInt8 *pos; @@ -512,6 +572,10 @@ init_event_types(void) static void postHeaderEvents(void) { + // The header must appear first in the output stream, without the + // the block start marker we previously added in printAndClearEventBuf. + resetEventsBuf(&eventBuf); + // Write in buffer: the header begin marker. postInt32(&eventBuf, EVENT_HEADER_BEGIN); @@ -571,6 +635,7 @@ initEventLogging() initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1)); #if defined(THREADED_RTS) initMutex(&eventBufMutex); + initMutex(&state_change_mutex); #endif } @@ -589,31 +654,39 @@ startEventLogging_(void) { initEventLogWriter(); + ACQUIRE_LOCK(&eventBufMutex); postHeaderEvents(); - // Flush capEventBuf with header. /* * Flush header and data begin marker to the file, thus preparing the * file to have events written to it. */ printAndClearEventBuf(&eventBuf); - for (uint32_t c = 0; c < get_n_capabilities(); ++c) { - postBlockMarker(&capEventBuf[c]); - } + RELEASE_LOCK(&eventBufMutex); + return true; } bool startEventLogging(const EventLogWriter *ev_writer) { + // Fail early if we race with another thread. + if (TRY_ACQUIRE_LOCK(&state_change_mutex) != 0) { + return false; + } + + // Check whether eventlogging has already been enabled. if (eventlog_enabled || event_log_writer) { + RELEASE_LOCK(&state_change_mutex); return false; } - eventlog_enabled = true; event_log_writer = ev_writer; - return startEventLogging_(); + bool ret = startEventLogging_(); + eventlog_enabled = true; + RELEASE_LOCK(&state_change_mutex); + return ret; } // Called during forkProcess in the child to restart the eventlog writer. @@ -628,18 +701,42 @@ restartEventLogging(void) } } +// Flush and free capability eventlog buffers in preparation for RTS shutdown. +void +finishCapEventLogging(void) +{ + if (eventlog_enabled) { + // Flush all events remaining in the capabilities' buffers and free them. + // N.B. at this point we hold all capabilities. + for (uint32_t c = 0; c < n_capabilities; ++c) { + if (capEventBuf[c].begin != NULL) { + printAndClearEventBuf(&capEventBuf[c]); + stgFree(capEventBuf[c].begin); + } + } + } +} + void endEventLogging(void) { - if (!eventlog_enabled) + ACQUIRE_LOCK(&state_change_mutex); + if (!eventlog_enabled) { + RELEASE_LOCK(&state_change_mutex); return; + } + + eventlog_enabled = false; // Flush all events remaining in the buffers. - for (uint32_t c = 0; c < n_capabilities; ++c) { - printAndClearEventBuf(&capEventBuf[c]); + // + // N.B. Don't flush if shutting down: this was done in + // finishCapEventLogging and the capabilities have already been freed. + if (sched_state != SCHED_SHUTTING_DOWN) { + flushEventLog(NULL); } - printAndClearEventBuf(&eventBuf); - resetEventsBuf(&eventBuf); // we don't want the block marker + + ACQUIRE_LOCK(&eventBufMutex); // Mark end of events (data). postEventTypeNum(&eventBuf, EVENT_DATA_END); @@ -647,11 +744,15 @@ endEventLogging(void) // Flush the end of data marker. printAndClearEventBuf(&eventBuf); + RELEASE_LOCK(&eventBufMutex); + stopEventLogWriter(); event_log_writer = NULL; - eventlog_enabled = false; + + RELEASE_LOCK(&state_change_mutex); } +/* N.B. we hold all capabilities when this is called */ void moreCapEventBufs (uint32_t from, uint32_t to) { @@ -663,6 +764,7 @@ moreCapEventBufs (uint32_t from, uint32_t to) "moreCapEventBufs"); } + // Initialize buffers for new capabilities for (uint32_t c = from; c < to; ++c) { initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c); } @@ -680,11 +782,6 @@ moreCapEventBufs (uint32_t from, uint32_t to) void freeEventLogging(void) { - // Free events buffer. - for (uint32_t c = 0; c < n_capabilities; ++c) { - if (capEventBuf[c].begin != NULL) - stgFree(capEventBuf[c].begin); - } if (capEventBuf != NULL) { stgFree(capEventBuf); } @@ -1571,6 +1668,7 @@ void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno) eb->size = size; eb->marker = NULL; eb->capno = capno; + postBlockMarker(eb); } void resetEventsBuf(EventsBuf* eb) diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h index e78db809c7..a412b491bb 100644 --- a/rts/eventlog/EventLog.h +++ b/rts/eventlog/EventLog.h @@ -26,6 +26,7 @@ extern bool eventlog_enabled; void initEventLogging(void); void restartEventLogging(void); +void finishCapEventLogging(void); void freeEventLogging(void); void abortEventLogging(void); // #4512 - after fork child needs to abort void moreCapEventBufs (uint32_t from, uint32_t to); @@ -182,6 +183,8 @@ void postTickyCounterSamples(StgEntCounter *p); #else /* !TRACING */ +INLINE_HEADER void finishCapEventLogging(void) {} + INLINE_HEADER void flushLocalEventsBuf(Capability *cap STG_UNUSED) { /* nothing */ } diff --git a/testsuite/tests/rts/InitEventLogging.stdout b/testsuite/tests/rts/InitEventLogging.stdout index ef5ce6f2d3..2729aa1c32 100644 --- a/testsuite/tests/rts/InitEventLogging.stdout +++ b/testsuite/tests/rts/InitEventLogging.stdout @@ -3,5 +3,4 @@ init write write write -write stop |