summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/xray/tests/unit/buffer_queue_test.cc116
-rw-r--r--lib/xray/xray_buffer_queue.cc150
-rw-r--r--lib/xray/xray_buffer_queue.h21
-rw-r--r--lib/xray/xray_fdr_logging.cc8
4 files changed, 62 insertions, 233 deletions
diff --git a/lib/xray/tests/unit/buffer_queue_test.cc b/lib/xray/tests/unit/buffer_queue_test.cc
index 8aa366a20..c0d4ccb26 100644
--- a/lib/xray/tests/unit/buffer_queue_test.cc
+++ b/lib/xray/tests/unit/buffer_queue_test.cc
@@ -13,9 +13,7 @@
#include "xray_buffer_queue.h"
#include "gtest/gtest.h"
-#include <atomic>
#include <future>
-#include <thread>
#include <unistd.h>
namespace __xray {
@@ -57,7 +55,6 @@ TEST(BufferQueueTest, ReleaseUnknown) {
BufferQueue::Buffer Buf;
Buf.Data = reinterpret_cast<void *>(0xdeadbeef);
Buf.Size = kSize;
- Buf.Generation = Buffers.generation();
EXPECT_EQ(BufferQueue::ErrorCode::UnrecognizedBuffer,
Buffers.releaseBuffer(Buf));
}
@@ -73,7 +70,8 @@ TEST(BufferQueueTest, ErrorsWhenFinalising) {
BufferQueue::Buffer OtherBuf;
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
Buffers.getBuffer(OtherBuf));
- ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing, Buffers.finalize());
+ ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
+ Buffers.finalize());
ASSERT_EQ(Buffers.releaseBuffer(Buf), BufferQueue::ErrorCode::Ok);
}
@@ -113,114 +111,4 @@ TEST(BufferQueueTest, Apply) {
ASSERT_EQ(Count, 10);
}
-TEST(BufferQueueTest, GenerationalSupport) {
- bool Success = false;
- BufferQueue Buffers(kSize, 10, Success);
- ASSERT_TRUE(Success);
- BufferQueue::Buffer B0;
- ASSERT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
- ASSERT_EQ(Buffers.finalize(),
- BufferQueue::ErrorCode::Ok); // No more new buffers.
-
- // Re-initialise the queue.
- ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
-
- BufferQueue::Buffer B1;
- ASSERT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
-
- // Validate that the buffers come from different generations.
- ASSERT_NE(B0.Generation, B1.Generation);
-
- // We stash the current generation, for use later.
- auto PrevGen = B1.Generation;
-
- // At this point, we want to ensure that we can return the buffer from the
- // first "generation" would still be accepted in the new generation...
- EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
-
- // ... and that the new buffer is also accepted.
- EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
-
- // A next round will do the same, ensure that we are able to do multiple
- // rounds in this case.
- ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
- ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
- EXPECT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
- EXPECT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
-
- // Here we ensure that the generation is different from the previous
- // generation.
- EXPECT_NE(B0.Generation, PrevGen);
- EXPECT_EQ(B1.Generation, B1.Generation);
- ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
- EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
- EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
-}
-
-TEST(BufferQueueTest, GenerationalSupportAcrossThreads) {
- bool Success = false;
- BufferQueue Buffers(kSize, 10, Success);
- ASSERT_TRUE(Success);
-
- std::atomic<int> Counter{0};
-
- // This function allows us to use thread-local storage to isolate the
- // instances of the buffers to be used. It also allows us signal the threads
- // of a new generation, and allow those to get new buffers. This is
- // representative of how we expect the buffer queue to be used by the XRay
- // runtime.
- auto Process = [&] {
- thread_local BufferQueue::Buffer B;
- ASSERT_EQ(Buffers.getBuffer(B), BufferQueue::ErrorCode::Ok);
- auto FirstGen = B.Generation;
-
- // Signal that we've gotten a buffer in the thread.
- Counter.fetch_add(1, std::memory_order_acq_rel);
- while (!Buffers.finalizing()) {
- Buffers.releaseBuffer(B);
- Buffers.getBuffer(B);
- }
-
- // Signal that we've exited the get/release buffer loop.
- Counter.fetch_sub(1, std::memory_order_acq_rel);
- if (B.Data != nullptr)
- Buffers.releaseBuffer(B);
-
- // Spin until we find that the Buffer Queue is no longer finalizing.
- while (Buffers.getBuffer(B) != BufferQueue::ErrorCode::Ok)
- ;
-
- // Signal that we've successfully gotten a buffer in the thread.
- Counter.fetch_add(1, std::memory_order_acq_rel);
-
- EXPECT_NE(FirstGen, B.Generation);
- EXPECT_EQ(Buffers.releaseBuffer(B), BufferQueue::ErrorCode::Ok);
-
- // Signal that we've successfully exited.
- Counter.fetch_sub(1, std::memory_order_acq_rel);
- };
-
- // Spawn two threads running Process.
- std::thread T0(Process), T1(Process);
-
- // Spin until we find the counter is up to 2.
- while (Counter.load(std::memory_order_acquire) != 2)
- ;
-
- // Then we finalize, then re-initialize immediately.
- Buffers.finalize();
-
- // Spin until we find the counter is down to 0.
- while (Counter.load(std::memory_order_acquire) != 0)
- ;
-
- // Then we re-initialize.
- EXPECT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
-
- T0.join();
- T1.join();
-
- ASSERT_EQ(Counter.load(std::memory_order_acquire), 0);
-}
-
} // namespace __xray
diff --git a/lib/xray/xray_buffer_queue.cc b/lib/xray/xray_buffer_queue.cc
index c17138d99..5a88ecd33 100644
--- a/lib/xray/xray_buffer_queue.cc
+++ b/lib/xray/xray_buffer_queue.cc
@@ -24,85 +24,58 @@
using namespace __xray;
using namespace __sanitizer;
-BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
- SpinMutexLock Guard(&Mutex);
-
- if (!finalizing())
- return BufferQueue::ErrorCode::AlreadyInitialized;
-
- bool Success = false;
- BufferSize = BS;
- BufferCount = BC;
- BackingStore = allocateBuffer(BufferSize * BufferCount);
- if (BackingStore == nullptr)
- return BufferQueue::ErrorCode::NotEnoughMemory;
-
- auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] {
- if (Success)
- return;
- deallocateBuffer(BackingStore, BufferSize * BufferCount);
- });
-
- Buffers = initArray<BufferRep>(BufferCount);
- if (Buffers == nullptr)
- return BufferQueue::ErrorCode::NotEnoughMemory;
-
- // At this point we increment the generation number to associate the buffers
- // to the new generation.
- atomic_fetch_add(&Generation, 1, memory_order_acq_rel);
-
- Success = true;
- for (size_t i = 0; i < BufferCount; ++i) {
- auto &T = Buffers[i];
- auto &Buf = T.Buff;
- atomic_store(&Buf.Extents, 0, memory_order_release);
- Buf.Generation = generation();
- Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
- Buf.Size = BufferSize;
- T.Used = false;
- }
-
- Next = Buffers;
- First = Buffers;
- LiveBuffers = 0;
- atomic_store(&Finalizing, 0, memory_order_release);
- return BufferQueue::ErrorCode::Ok;
-}
-
BufferQueue::BufferQueue(size_t B, size_t N,
bool &Success) XRAY_NEVER_INSTRUMENT
: BufferSize(B),
BufferCount(N),
Mutex(),
- Finalizing{1},
- BackingStore(nullptr),
- Buffers(nullptr),
+ Finalizing{0},
+ BackingStore(allocateBuffer(B *N)),
+ Buffers(initArray<BufferQueue::BufferRep>(N)),
Next(Buffers),
First(Buffers),
- LiveBuffers(0),
- Generation{0} {
- Success = init(B, N) == BufferQueue::ErrorCode::Ok;
+ LiveBuffers(0) {
+ if (BackingStore == nullptr) {
+ Success = false;
+ return;
+ }
+ if (Buffers == nullptr) {
+ deallocateBuffer(BackingStore, BufferSize * BufferCount);
+ Success = false;
+ return;
+ }
+
+ for (size_t i = 0; i < N; ++i) {
+ auto &T = Buffers[i];
+ auto &Buf = T.Buff;
+ Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
+ Buf.Size = B;
+ atomic_store(&Buf.Extents, 0, memory_order_release);
+ T.Used = false;
+ }
+ Success = true;
}
BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
if (atomic_load(&Finalizing, memory_order_acquire))
return ErrorCode::QueueFinalizing;
- BufferRep *B = nullptr;
- {
- SpinMutexLock Guard(&Mutex);
- if (LiveBuffers == BufferCount)
- return ErrorCode::NotEnoughMemory;
- B = Next++;
- if (Next == (Buffers + BufferCount))
- Next = Buffers;
- ++LiveBuffers;
- }
+ SpinMutexLock Guard(&Mutex);
+ if (LiveBuffers == BufferCount)
+ return ErrorCode::NotEnoughMemory;
+
+ auto &T = *Next;
+ auto &B = T.Buff;
+ auto Extents = atomic_load(&B.Extents, memory_order_acquire);
+ atomic_store(&Buf.Extents, Extents, memory_order_release);
+ Buf.Data = B.Data;
+ Buf.Size = B.Size;
+ T.Used = true;
+ ++LiveBuffers;
+
+ if (++Next == (Buffers + BufferCount))
+ Next = Buffers;
- Buf.Data = B->Buff.Data;
- Buf.Generation = generation();
- Buf.Size = B->Buff.Size;
- B->Used = true;
return ErrorCode::Ok;
}
@@ -111,42 +84,29 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
// backing store's range.
if (Buf.Data < BackingStore ||
Buf.Data >
- reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize)) {
- if (Buf.Generation != generation()) {
- Buf.Data = nullptr;
- Buf.Size = 0;
- Buf.Generation = 0;
- return BufferQueue::ErrorCode::Ok;
- }
- return BufferQueue::ErrorCode::UnrecognizedBuffer;
- }
+ reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize))
+ return ErrorCode::UnrecognizedBuffer;
- BufferRep *B = nullptr;
- {
- SpinMutexLock Guard(&Mutex);
-
- // This points to a semantic bug, we really ought to not be releasing more
- // buffers than we actually get.
- if (LiveBuffers == 0)
- return ErrorCode::NotEnoughMemory;
+ SpinMutexLock Guard(&Mutex);
- --LiveBuffers;
- B = First++;
- if (First == (Buffers + BufferCount))
- First = Buffers;
- }
+ // This points to a semantic bug, we really ought to not be releasing more
+ // buffers than we actually get.
+ if (LiveBuffers == 0)
+ return ErrorCode::NotEnoughMemory;
// Now that the buffer has been released, we mark it as "used".
- B->Buff.Data = Buf.Data;
- B->Buff.Size = Buf.Size;
- B->Buff.Generation = Buf.Generation;
- B->Used = true;
- atomic_store(&B->Buff.Extents,
- atomic_load(&Buf.Extents, memory_order_acquire),
- memory_order_release);
+ auto Extents = atomic_load(&Buf.Extents, memory_order_acquire);
+ atomic_store(&First->Buff.Extents, Extents, memory_order_release);
+ First->Buff.Data = Buf.Data;
+ First->Buff.Size = Buf.Size;
+ First->Used = true;
Buf.Data = nullptr;
Buf.Size = 0;
- Buf.Generation = 0;
+ atomic_store(&Buf.Extents, 0, memory_order_release);
+ --LiveBuffers;
+ if (++First == (Buffers + BufferCount))
+ First = Buffers;
+
return ErrorCode::Ok;
}
diff --git a/lib/xray/xray_buffer_queue.h b/lib/xray/xray_buffer_queue.h
index cbd42835f..c1fa9fab7 100644
--- a/lib/xray/xray_buffer_queue.h
+++ b/lib/xray/xray_buffer_queue.h
@@ -33,7 +33,6 @@ class BufferQueue {
public:
struct Buffer {
atomic_uint64_t Extents{0};
- uint64_t Generation{0};
void *Data = nullptr;
size_t Size = 0;
};
@@ -131,10 +130,6 @@ private:
// Count of buffers that have been handed out through 'getBuffer'.
size_t LiveBuffers;
- // We use a generation number to identify buffers and which generation they're
- // associated with.
- atomic_uint64_t Generation;
-
public:
enum class ErrorCode : unsigned {
Ok,
@@ -142,7 +137,6 @@ public:
QueueFinalizing,
UnrecognizedBuffer,
AlreadyFinalized,
- AlreadyInitialized,
};
static const char *getErrorString(ErrorCode E) {
@@ -157,8 +151,6 @@ public:
return "buffer being returned not owned by buffer queue";
case ErrorCode::AlreadyFinalized:
return "queue already finalized";
- case ErrorCode::AlreadyInitialized:
- return "queue already initialized";
}
return "unknown error";
}
@@ -189,23 +181,10 @@ public:
/// the buffer being released.
ErrorCode releaseBuffer(Buffer &Buf);
- /// Initializes the buffer queue, starting a new generation. We can re-set the
- /// size of buffers with |BS| along with the buffer count with |BC|.
- ///
- /// Returns:
- /// - ErrorCode::Ok when we successfully initialize the buffer. This
- /// requires that the buffer queue is previously finalized.
- /// - ErrorCode::AlreadyInitialized when the buffer queue is not finalized.
- ErrorCode init(size_t BS, size_t BC);
-
bool finalizing() const {
return atomic_load(&Finalizing, memory_order_acquire);
}
- uint64_t generation() const {
- return atomic_load(&Generation, memory_order_acquire);
- }
-
/// Returns the configured size of the buffers in the buffer queue.
size_t ConfiguredBufferSize() const { return BufferSize; }
diff --git a/lib/xray/xray_fdr_logging.cc b/lib/xray/xray_fdr_logging.cc
index 2479a0fa7..32188771c 100644
--- a/lib/xray/xray_fdr_logging.cc
+++ b/lib/xray/xray_fdr_logging.cc
@@ -1056,7 +1056,8 @@ void fdrLoggingHandleTypedEvent(
endBufferIfFull();
}
-XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options,
+XRayLogInitStatus fdrLoggingInit(UNUSED size_t BufferSize,
+ UNUSED size_t BufferMax, void *Options,
size_t OptionsSize) XRAY_NEVER_INSTRUMENT {
if (Options == nullptr)
return XRayLogInitStatus::XRAY_LOG_UNINITIALIZED;
@@ -1103,8 +1104,9 @@ XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options,
// environment-variable defined options.
FDRParser.ParseString(static_cast<const char *>(Options));
*fdrFlags() = FDRFlags;
- auto BufferSize = FDRFlags.buffer_size;
- auto BufferMax = FDRFlags.buffer_max;
+ BufferSize = FDRFlags.buffer_size;
+ BufferMax = FDRFlags.buffer_max;
+
bool Success = false;
if (BQ != nullptr) {