summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rts/RtsStartup.c4
-rw-r--r--rts/Trace.h14
-rw-r--r--rts/eventlog/EventLog.c134
-rw-r--r--rts/eventlog/EventLog.h3
-rw-r--r--testsuite/tests/rts/InitEventLogging.stdout1
5 files changed, 136 insertions, 20 deletions
diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c
index 818fbaa346..bd8e5d5733 100644
--- a/rts/RtsStartup.c
+++ b/rts/RtsStartup.c
@@ -507,6 +507,10 @@ hs_exit_(bool wait_foreign)
// also outputs the stats (+RTS -s) info.
exitStorage();
+ /* flush and clean up capabilities' eventlog buffers before cleaning up
+ * scheduler */
+ finishCapEventLogging();
+
/* free the tasks */
freeScheduler();
diff --git a/rts/Trace.h b/rts/Trace.h
index cad83363d3..08b42fe9bd 100644
--- a/rts/Trace.h
+++ b/rts/Trace.h
@@ -8,7 +8,7 @@
#pragma once
-#include "rts/EventLogFormat.h"
+#include "eventlog/EventLog.h"
#include "sm/NonMovingCensus.h"
#include "Capability.h"
@@ -616,6 +616,18 @@ INLINE_HEADER void traceCapDisable(Capability *cap STG_UNUSED)
{
traceCapEvent(cap, EVENT_CAP_DISABLE);
dtraceCapDisable((EventCapNo)cap->no);
+
+ // Ensure that the eventlog buffer is flushed since otherwise its events
+ // may never make it to the output stream.
+ // See Note [Eventlog concurrency].
+#if defined(TRACING)
+ if (eventlog_enabled) {
+ flushLocalEventsBuf(cap);
+ }
+# else
+ flushLocalEventsBuf(cap);
+#endif
+
}
INLINE_HEADER void traceEventThreadWakeup(Capability *cap STG_UNUSED,
diff --git a/rts/eventlog/EventLog.c b/rts/eventlog/EventLog.c
index ddd2cc02df..876f1ff7b6 100644
--- a/rts/eventlog/EventLog.c
+++ b/rts/eventlog/EventLog.c
@@ -27,7 +27,65 @@
#include <unistd.h>
#endif
-bool eventlog_enabled;
+Mutex state_change_mutex;
+bool eventlog_enabled; // protected by state_change_mutex to ensure
+ // serialisation of calls to
+ // startEventLogging/endEventLogging
+
+/* Note [Eventlog concurrency]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ * The eventlog is designed to handle high rates of concurrent event posting by
+ * multiple capabilities. For this reason, each capability has its own local
+ * event buffer, which is flushed when filled.
+ *
+ * Additionally, there is a single global event buffer (`eventBuf`), which is
+ * used for various administrative things (e.g. posting the header) and in
+ * cases where we want to post events yet don't hold a capability.
+ *
+ * Whether or not events are posted is determined by the global flag
+ * eventlog_enabled. Naturally, starting and stopping of logging are a bit
+ * subtle. In particular, we need to ensure that the header is the *first*
+ * thing to appear in the event stream. To ensure that multiple threads don't
+ * race to start/stop logging we protect eventlog_enabled with
+ * state_change_mutex. Moreover, we only set eventlog_enabled *after* we have
+ * posted the header.
+ *
+ * Event buffers uphold the invariant that they always begin with a start-block
+ * marker. This is enforced by calls to postBlockMarker in:
+ *
+ * - initEventsBufs (during buffer initialization)
+ * - printAndClearEventBuf (after the buffer is filled)
+ * - moreCapEventBufs (when the number of capabilities changes)
+ *
+ * The one place where we *don't* want a block marker is when posting the
+ * eventlog header. We achieve this by first resetting the eventlog buffer
+ * before posting the header (in postHeader).
+ *
+ * Stopping eventlogging is a bit involved:
+ *
+ * 1. first take state_change_mutex, to ensure we don't race with another
+ * thread to stop logging.
+ * 2. disable eventlog_enabled, to ensure that no capabilities post further
+ * events
+ * 3. request that all capabilities flush their eventlog buffers. This
+ * achieves two things: (a) ensures that all events make it to the output
+ * stream, and (b) serves as a memory barrier, ensuring that all
+ * capabilities see that eventlogging is now disabled
+ * 4. wait until all capabilities have flushed.
+ * 5. post the end-of-data marker
+ * 6. stop the writer
+ * 7. release state_change_mutex
+ *
+ * Note that a corrollary of this is that !eventlog_enabled implies that the
+ * eventlog buffers are all empty (modulo the block marker that all buffers
+ * always have).
+ *
+ * Changing the number of capabilities is fairly straightforward since we hold
+ * all capabilities when the capability count is changed. The one slight corner
+ * case is that we must ensure that the buffers of any disabled capabilities are
+ * flushed, lest their events are stuck in limbo. This is achieved with a call to
+ * flushLocalEventsBuf in traceCapDisable.
+ */
static const EventLogWriter *event_log_writer = NULL;
@@ -36,6 +94,8 @@ static const EventLogWriter *event_log_writer = NULL;
static int flushCount;
// Struct for record keeping of buffer to store event types and events.
+//
+// Invariant: The event buffer will always begin with a block-start marker.
typedef struct _EventsBuf {
StgInt8 *begin;
StgInt8 *pos;
@@ -512,6 +572,10 @@ init_event_types(void)
static void
postHeaderEvents(void)
{
+ // The header must appear first in the output stream, without the
+ // the block start marker we previously added in printAndClearEventBuf.
+ resetEventsBuf(&eventBuf);
+
// Write in buffer: the header begin marker.
postInt32(&eventBuf, EVENT_HEADER_BEGIN);
@@ -571,6 +635,7 @@ initEventLogging()
initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1));
#if defined(THREADED_RTS)
initMutex(&eventBufMutex);
+ initMutex(&state_change_mutex);
#endif
}
@@ -589,31 +654,39 @@ startEventLogging_(void)
{
initEventLogWriter();
+ ACQUIRE_LOCK(&eventBufMutex);
postHeaderEvents();
- // Flush capEventBuf with header.
/*
* Flush header and data begin marker to the file, thus preparing the
* file to have events written to it.
*/
printAndClearEventBuf(&eventBuf);
- for (uint32_t c = 0; c < get_n_capabilities(); ++c) {
- postBlockMarker(&capEventBuf[c]);
- }
+ RELEASE_LOCK(&eventBufMutex);
+
return true;
}
bool
startEventLogging(const EventLogWriter *ev_writer)
{
+ // Fail early if we race with another thread.
+ if (TRY_ACQUIRE_LOCK(&state_change_mutex) != 0) {
+ return false;
+ }
+
+ // Check whether eventlogging has already been enabled.
if (eventlog_enabled || event_log_writer) {
+ RELEASE_LOCK(&state_change_mutex);
return false;
}
- eventlog_enabled = true;
event_log_writer = ev_writer;
- return startEventLogging_();
+ bool ret = startEventLogging_();
+ eventlog_enabled = true;
+ RELEASE_LOCK(&state_change_mutex);
+ return ret;
}
// Called during forkProcess in the child to restart the eventlog writer.
@@ -628,18 +701,42 @@ restartEventLogging(void)
}
}
+// Flush and free capability eventlog buffers in preparation for RTS shutdown.
+void
+finishCapEventLogging(void)
+{
+ if (eventlog_enabled) {
+ // Flush all events remaining in the capabilities' buffers and free them.
+ // N.B. at this point we hold all capabilities.
+ for (uint32_t c = 0; c < n_capabilities; ++c) {
+ if (capEventBuf[c].begin != NULL) {
+ printAndClearEventBuf(&capEventBuf[c]);
+ stgFree(capEventBuf[c].begin);
+ }
+ }
+ }
+}
+
void
endEventLogging(void)
{
- if (!eventlog_enabled)
+ ACQUIRE_LOCK(&state_change_mutex);
+ if (!eventlog_enabled) {
+ RELEASE_LOCK(&state_change_mutex);
return;
+ }
+
+ eventlog_enabled = false;
// Flush all events remaining in the buffers.
- for (uint32_t c = 0; c < n_capabilities; ++c) {
- printAndClearEventBuf(&capEventBuf[c]);
+ //
+ // N.B. Don't flush if shutting down: this was done in
+ // finishCapEventLogging and the capabilities have already been freed.
+ if (sched_state != SCHED_SHUTTING_DOWN) {
+ flushEventLog(NULL);
}
- printAndClearEventBuf(&eventBuf);
- resetEventsBuf(&eventBuf); // we don't want the block marker
+
+ ACQUIRE_LOCK(&eventBufMutex);
// Mark end of events (data).
postEventTypeNum(&eventBuf, EVENT_DATA_END);
@@ -647,11 +744,15 @@ endEventLogging(void)
// Flush the end of data marker.
printAndClearEventBuf(&eventBuf);
+ RELEASE_LOCK(&eventBufMutex);
+
stopEventLogWriter();
event_log_writer = NULL;
- eventlog_enabled = false;
+
+ RELEASE_LOCK(&state_change_mutex);
}
+/* N.B. we hold all capabilities when this is called */
void
moreCapEventBufs (uint32_t from, uint32_t to)
{
@@ -663,6 +764,7 @@ moreCapEventBufs (uint32_t from, uint32_t to)
"moreCapEventBufs");
}
+ // Initialize buffers for new capabilities
for (uint32_t c = from; c < to; ++c) {
initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c);
}
@@ -680,11 +782,6 @@ moreCapEventBufs (uint32_t from, uint32_t to)
void
freeEventLogging(void)
{
- // Free events buffer.
- for (uint32_t c = 0; c < n_capabilities; ++c) {
- if (capEventBuf[c].begin != NULL)
- stgFree(capEventBuf[c].begin);
- }
if (capEventBuf != NULL) {
stgFree(capEventBuf);
}
@@ -1571,6 +1668,7 @@ void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno)
eb->size = size;
eb->marker = NULL;
eb->capno = capno;
+ postBlockMarker(eb);
}
void resetEventsBuf(EventsBuf* eb)
diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h
index e78db809c7..a412b491bb 100644
--- a/rts/eventlog/EventLog.h
+++ b/rts/eventlog/EventLog.h
@@ -26,6 +26,7 @@ extern bool eventlog_enabled;
void initEventLogging(void);
void restartEventLogging(void);
+void finishCapEventLogging(void);
void freeEventLogging(void);
void abortEventLogging(void); // #4512 - after fork child needs to abort
void moreCapEventBufs (uint32_t from, uint32_t to);
@@ -182,6 +183,8 @@ void postTickyCounterSamples(StgEntCounter *p);
#else /* !TRACING */
+INLINE_HEADER void finishCapEventLogging(void) {}
+
INLINE_HEADER void flushLocalEventsBuf(Capability *cap STG_UNUSED)
{ /* nothing */ }
diff --git a/testsuite/tests/rts/InitEventLogging.stdout b/testsuite/tests/rts/InitEventLogging.stdout
index ef5ce6f2d3..2729aa1c32 100644
--- a/testsuite/tests/rts/InitEventLogging.stdout
+++ b/testsuite/tests/rts/InitEventLogging.stdout
@@ -3,5 +3,4 @@ init
write
write
write
-write
stop