diff options
-rw-r--r-- | lib/xray/tests/unit/buffer_queue_test.cc | 116 | ||||
-rw-r--r-- | lib/xray/xray_buffer_queue.cc | 150 | ||||
-rw-r--r-- | lib/xray/xray_buffer_queue.h | 21 | ||||
-rw-r--r-- | lib/xray/xray_fdr_logging.cc | 8 |
4 files changed, 62 insertions, 233 deletions
diff --git a/lib/xray/tests/unit/buffer_queue_test.cc b/lib/xray/tests/unit/buffer_queue_test.cc index 8aa366a20..c0d4ccb26 100644 --- a/lib/xray/tests/unit/buffer_queue_test.cc +++ b/lib/xray/tests/unit/buffer_queue_test.cc @@ -13,9 +13,7 @@ #include "xray_buffer_queue.h" #include "gtest/gtest.h" -#include <atomic> #include <future> -#include <thread> #include <unistd.h> namespace __xray { @@ -57,7 +55,6 @@ TEST(BufferQueueTest, ReleaseUnknown) { BufferQueue::Buffer Buf; Buf.Data = reinterpret_cast<void *>(0xdeadbeef); Buf.Size = kSize; - Buf.Generation = Buffers.generation(); EXPECT_EQ(BufferQueue::ErrorCode::UnrecognizedBuffer, Buffers.releaseBuffer(Buf)); } @@ -73,7 +70,8 @@ TEST(BufferQueueTest, ErrorsWhenFinalising) { BufferQueue::Buffer OtherBuf; ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing, Buffers.getBuffer(OtherBuf)); - ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing, Buffers.finalize()); + ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing, + Buffers.finalize()); ASSERT_EQ(Buffers.releaseBuffer(Buf), BufferQueue::ErrorCode::Ok); } @@ -113,114 +111,4 @@ TEST(BufferQueueTest, Apply) { ASSERT_EQ(Count, 10); } -TEST(BufferQueueTest, GenerationalSupport) { - bool Success = false; - BufferQueue Buffers(kSize, 10, Success); - ASSERT_TRUE(Success); - BufferQueue::Buffer B0; - ASSERT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok); - ASSERT_EQ(Buffers.finalize(), - BufferQueue::ErrorCode::Ok); // No more new buffers. - - // Re-initialise the queue. - ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok); - - BufferQueue::Buffer B1; - ASSERT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok); - - // Validate that the buffers come from different generations. - ASSERT_NE(B0.Generation, B1.Generation); - - // We stash the current generation, for use later. - auto PrevGen = B1.Generation; - - // At this point, we want to ensure that we can return the buffer from the - // first "generation" would still be accepted in the new generation... - EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok); - - // ... and that the new buffer is also accepted. - EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok); - - // A next round will do the same, ensure that we are able to do multiple - // rounds in this case. - ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok); - ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok); - EXPECT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok); - EXPECT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok); - - // Here we ensure that the generation is different from the previous - // generation. - EXPECT_NE(B0.Generation, PrevGen); - EXPECT_EQ(B1.Generation, B1.Generation); - ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok); - EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok); - EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok); -} - -TEST(BufferQueueTest, GenerationalSupportAcrossThreads) { - bool Success = false; - BufferQueue Buffers(kSize, 10, Success); - ASSERT_TRUE(Success); - - std::atomic<int> Counter{0}; - - // This function allows us to use thread-local storage to isolate the - // instances of the buffers to be used. It also allows us signal the threads - // of a new generation, and allow those to get new buffers. This is - // representative of how we expect the buffer queue to be used by the XRay - // runtime. - auto Process = [&] { - thread_local BufferQueue::Buffer B; - ASSERT_EQ(Buffers.getBuffer(B), BufferQueue::ErrorCode::Ok); - auto FirstGen = B.Generation; - - // Signal that we've gotten a buffer in the thread. - Counter.fetch_add(1, std::memory_order_acq_rel); - while (!Buffers.finalizing()) { - Buffers.releaseBuffer(B); - Buffers.getBuffer(B); - } - - // Signal that we've exited the get/release buffer loop. - Counter.fetch_sub(1, std::memory_order_acq_rel); - if (B.Data != nullptr) - Buffers.releaseBuffer(B); - - // Spin until we find that the Buffer Queue is no longer finalizing. - while (Buffers.getBuffer(B) != BufferQueue::ErrorCode::Ok) - ; - - // Signal that we've successfully gotten a buffer in the thread. - Counter.fetch_add(1, std::memory_order_acq_rel); - - EXPECT_NE(FirstGen, B.Generation); - EXPECT_EQ(Buffers.releaseBuffer(B), BufferQueue::ErrorCode::Ok); - - // Signal that we've successfully exited. - Counter.fetch_sub(1, std::memory_order_acq_rel); - }; - - // Spawn two threads running Process. - std::thread T0(Process), T1(Process); - - // Spin until we find the counter is up to 2. - while (Counter.load(std::memory_order_acquire) != 2) - ; - - // Then we finalize, then re-initialize immediately. - Buffers.finalize(); - - // Spin until we find the counter is down to 0. - while (Counter.load(std::memory_order_acquire) != 0) - ; - - // Then we re-initialize. - EXPECT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok); - - T0.join(); - T1.join(); - - ASSERT_EQ(Counter.load(std::memory_order_acquire), 0); -} - } // namespace __xray diff --git a/lib/xray/xray_buffer_queue.cc b/lib/xray/xray_buffer_queue.cc index c17138d99..5a88ecd33 100644 --- a/lib/xray/xray_buffer_queue.cc +++ b/lib/xray/xray_buffer_queue.cc @@ -24,85 +24,58 @@ using namespace __xray; using namespace __sanitizer; -BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) { - SpinMutexLock Guard(&Mutex); - - if (!finalizing()) - return BufferQueue::ErrorCode::AlreadyInitialized; - - bool Success = false; - BufferSize = BS; - BufferCount = BC; - BackingStore = allocateBuffer(BufferSize * BufferCount); - if (BackingStore == nullptr) - return BufferQueue::ErrorCode::NotEnoughMemory; - - auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] { - if (Success) - return; - deallocateBuffer(BackingStore, BufferSize * BufferCount); - }); - - Buffers = initArray<BufferRep>(BufferCount); - if (Buffers == nullptr) - return BufferQueue::ErrorCode::NotEnoughMemory; - - // At this point we increment the generation number to associate the buffers - // to the new generation. - atomic_fetch_add(&Generation, 1, memory_order_acq_rel); - - Success = true; - for (size_t i = 0; i < BufferCount; ++i) { - auto &T = Buffers[i]; - auto &Buf = T.Buff; - atomic_store(&Buf.Extents, 0, memory_order_release); - Buf.Generation = generation(); - Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i); - Buf.Size = BufferSize; - T.Used = false; - } - - Next = Buffers; - First = Buffers; - LiveBuffers = 0; - atomic_store(&Finalizing, 0, memory_order_release); - return BufferQueue::ErrorCode::Ok; -} - BufferQueue::BufferQueue(size_t B, size_t N, bool &Success) XRAY_NEVER_INSTRUMENT : BufferSize(B), BufferCount(N), Mutex(), - Finalizing{1}, - BackingStore(nullptr), - Buffers(nullptr), + Finalizing{0}, + BackingStore(allocateBuffer(B *N)), + Buffers(initArray<BufferQueue::BufferRep>(N)), Next(Buffers), First(Buffers), - LiveBuffers(0), - Generation{0} { - Success = init(B, N) == BufferQueue::ErrorCode::Ok; + LiveBuffers(0) { + if (BackingStore == nullptr) { + Success = false; + return; + } + if (Buffers == nullptr) { + deallocateBuffer(BackingStore, BufferSize * BufferCount); + Success = false; + return; + } + + for (size_t i = 0; i < N; ++i) { + auto &T = Buffers[i]; + auto &Buf = T.Buff; + Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i); + Buf.Size = B; + atomic_store(&Buf.Extents, 0, memory_order_release); + T.Used = false; + } + Success = true; } BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) { if (atomic_load(&Finalizing, memory_order_acquire)) return ErrorCode::QueueFinalizing; - BufferRep *B = nullptr; - { - SpinMutexLock Guard(&Mutex); - if (LiveBuffers == BufferCount) - return ErrorCode::NotEnoughMemory; - B = Next++; - if (Next == (Buffers + BufferCount)) - Next = Buffers; - ++LiveBuffers; - } + SpinMutexLock Guard(&Mutex); + if (LiveBuffers == BufferCount) + return ErrorCode::NotEnoughMemory; + + auto &T = *Next; + auto &B = T.Buff; + auto Extents = atomic_load(&B.Extents, memory_order_acquire); + atomic_store(&Buf.Extents, Extents, memory_order_release); + Buf.Data = B.Data; + Buf.Size = B.Size; + T.Used = true; + ++LiveBuffers; + + if (++Next == (Buffers + BufferCount)) + Next = Buffers; - Buf.Data = B->Buff.Data; - Buf.Generation = generation(); - Buf.Size = B->Buff.Size; - B->Used = true; return ErrorCode::Ok; } @@ -111,42 +84,29 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) { // backing store's range. if (Buf.Data < BackingStore || Buf.Data > - reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize)) { - if (Buf.Generation != generation()) { - Buf.Data = nullptr; - Buf.Size = 0; - Buf.Generation = 0; - return BufferQueue::ErrorCode::Ok; - } - return BufferQueue::ErrorCode::UnrecognizedBuffer; - } + reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize)) + return ErrorCode::UnrecognizedBuffer; - BufferRep *B = nullptr; - { - SpinMutexLock Guard(&Mutex); - - // This points to a semantic bug, we really ought to not be releasing more - // buffers than we actually get. - if (LiveBuffers == 0) - return ErrorCode::NotEnoughMemory; + SpinMutexLock Guard(&Mutex); - --LiveBuffers; - B = First++; - if (First == (Buffers + BufferCount)) - First = Buffers; - } + // This points to a semantic bug, we really ought to not be releasing more + // buffers than we actually get. + if (LiveBuffers == 0) + return ErrorCode::NotEnoughMemory; // Now that the buffer has been released, we mark it as "used". - B->Buff.Data = Buf.Data; - B->Buff.Size = Buf.Size; - B->Buff.Generation = Buf.Generation; - B->Used = true; - atomic_store(&B->Buff.Extents, - atomic_load(&Buf.Extents, memory_order_acquire), - memory_order_release); + auto Extents = atomic_load(&Buf.Extents, memory_order_acquire); + atomic_store(&First->Buff.Extents, Extents, memory_order_release); + First->Buff.Data = Buf.Data; + First->Buff.Size = Buf.Size; + First->Used = true; Buf.Data = nullptr; Buf.Size = 0; - Buf.Generation = 0; + atomic_store(&Buf.Extents, 0, memory_order_release); + --LiveBuffers; + if (++First == (Buffers + BufferCount)) + First = Buffers; + return ErrorCode::Ok; } diff --git a/lib/xray/xray_buffer_queue.h b/lib/xray/xray_buffer_queue.h index cbd42835f..c1fa9fab7 100644 --- a/lib/xray/xray_buffer_queue.h +++ b/lib/xray/xray_buffer_queue.h @@ -33,7 +33,6 @@ class BufferQueue { public: struct Buffer { atomic_uint64_t Extents{0}; - uint64_t Generation{0}; void *Data = nullptr; size_t Size = 0; }; @@ -131,10 +130,6 @@ private: // Count of buffers that have been handed out through 'getBuffer'. size_t LiveBuffers; - // We use a generation number to identify buffers and which generation they're - // associated with. - atomic_uint64_t Generation; - public: enum class ErrorCode : unsigned { Ok, @@ -142,7 +137,6 @@ public: QueueFinalizing, UnrecognizedBuffer, AlreadyFinalized, - AlreadyInitialized, }; static const char *getErrorString(ErrorCode E) { @@ -157,8 +151,6 @@ public: return "buffer being returned not owned by buffer queue"; case ErrorCode::AlreadyFinalized: return "queue already finalized"; - case ErrorCode::AlreadyInitialized: - return "queue already initialized"; } return "unknown error"; } @@ -189,23 +181,10 @@ public: /// the buffer being released. ErrorCode releaseBuffer(Buffer &Buf); - /// Initializes the buffer queue, starting a new generation. We can re-set the - /// size of buffers with |BS| along with the buffer count with |BC|. - /// - /// Returns: - /// - ErrorCode::Ok when we successfully initialize the buffer. This - /// requires that the buffer queue is previously finalized. - /// - ErrorCode::AlreadyInitialized when the buffer queue is not finalized. - ErrorCode init(size_t BS, size_t BC); - bool finalizing() const { return atomic_load(&Finalizing, memory_order_acquire); } - uint64_t generation() const { - return atomic_load(&Generation, memory_order_acquire); - } - /// Returns the configured size of the buffers in the buffer queue. size_t ConfiguredBufferSize() const { return BufferSize; } diff --git a/lib/xray/xray_fdr_logging.cc b/lib/xray/xray_fdr_logging.cc index 2479a0fa7..32188771c 100644 --- a/lib/xray/xray_fdr_logging.cc +++ b/lib/xray/xray_fdr_logging.cc @@ -1056,7 +1056,8 @@ void fdrLoggingHandleTypedEvent( endBufferIfFull(); } -XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options, +XRayLogInitStatus fdrLoggingInit(UNUSED size_t BufferSize, + UNUSED size_t BufferMax, void *Options, size_t OptionsSize) XRAY_NEVER_INSTRUMENT { if (Options == nullptr) return XRayLogInitStatus::XRAY_LOG_UNINITIALIZED; @@ -1103,8 +1104,9 @@ XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options, // environment-variable defined options. FDRParser.ParseString(static_cast<const char *>(Options)); *fdrFlags() = FDRFlags; - auto BufferSize = FDRFlags.buffer_size; - auto BufferMax = FDRFlags.buffer_max; + BufferSize = FDRFlags.buffer_size; + BufferMax = FDRFlags.buffer_max; + bool Success = false; if (BQ != nullptr) { |