diff options
-rw-r--r-- | libc/src/__support/RPC/CMakeLists.txt | 2 | ||||
-rw-r--r-- | libc/src/__support/RPC/rpc.h | 138 | ||||
-rw-r--r-- | libc/src/__support/RPC/rpc_util.h | 12 | ||||
-rw-r--r-- | libc/startup/gpu/amdgpu/start.cpp | 2 | ||||
-rw-r--r-- | libc/startup/gpu/nvptx/start.cpp | 2 | ||||
-rw-r--r-- | libc/test/integration/startup/gpu/CMakeLists.txt | 8 | ||||
-rw-r--r-- | libc/test/integration/startup/gpu/rpc_test.cpp | 15 | ||||
-rw-r--r-- | libc/utils/gpu/loader/Loader.h | 5 | ||||
-rw-r--r-- | libc/utils/gpu/loader/Server.h | 23 | ||||
-rw-r--r-- | libc/utils/gpu/loader/amdgpu/Loader.cpp | 11 | ||||
-rw-r--r-- | libc/utils/gpu/loader/nvptx/Loader.cpp | 8 |
11 files changed, 184 insertions, 42 deletions
diff --git a/libc/src/__support/RPC/CMakeLists.txt b/libc/src/__support/RPC/CMakeLists.txt index 1ac2a3b0548c..96bfd8e72954 100644 --- a/libc/src/__support/RPC/CMakeLists.txt +++ b/libc/src/__support/RPC/CMakeLists.txt @@ -10,6 +10,8 @@ add_header_library( DEPENDS libc.src.__support.common libc.src.__support.CPP.atomic + libc.src.__support.CPP.optional + libc.src.__support.CPP.functional libc.src.__support.GPU.utils ) diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h index fc7a66f4b88d..170f74830b14 100644 --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -20,6 +20,7 @@ #include "rpc_util.h" #include "src/__support/CPP/atomic.h" +#include "src/__support/CPP/functional.h" #include "src/__support/CPP/optional.h" #include "src/__support/GPU/utils.h" #include "src/string/memory_utils/memcpy_implementations.h" @@ -38,12 +39,36 @@ enum Opcode : uint16_t { }; /// A fixed size channel used to communicate between the RPC client and server. -struct alignas(64) Buffer { - uint8_t data[62]; - uint16_t opcode; +struct Buffer { + uint64_t data[8]; }; static_assert(sizeof(Buffer) == 64, "Buffer size mismatch"); +/// The information associated with a packet. This indicates which operations to +/// perform and which threads are active in the slots. +struct Header { + uint64_t mask; + uint16_t opcode; +}; + +/// The data payload for the associated packet. We provide enough space for each +/// thread in the cooperating lane to have a buffer. +struct Payload { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + Buffer slot[gpu::LANE_SIZE]; +#else + // Flexible array size allocated at runtime to the appropriate size. + Buffer slot[]; +#endif +}; + +/// A packet used to share data between the client and server across an entire +/// lane. We use a lane as the minimum granularity for execution. +struct alignas(64) Packet { + Header header; + Payload payload; +}; + /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer between both sides. @@ -71,18 +96,21 @@ template <bool InvertInbox> struct Process { LIBC_INLINE Process &operator=(const Process &) = default; LIBC_INLINE ~Process() = default; + uint32_t lane_size; cpp::Atomic<uint32_t> *lock; cpp::Atomic<uint32_t> *inbox; cpp::Atomic<uint32_t> *outbox; - Buffer *buffer; + Packet *buffer; /// Initialize the communication channels. - LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) { + LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox, + void *outbox, void *buffer) { *this = { + lane_size, reinterpret_cast<cpp::Atomic<uint32_t> *>(lock), reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox), reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox), - reinterpret_cast<Buffer *>(buffer), + reinterpret_cast<Packet *>(buffer), }; } @@ -144,7 +172,8 @@ template <bool InvertInbox> struct Process { return lane_mask != packed; } - // Unlock the lock at index. + /// Unlock the lock at index. We need a lane sync to keep this function + /// convergent, otherwise the compiler will sink the store and deadlock. [[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask, uint64_t index) { // Wait for other threads in the warp to finish using the lock @@ -156,6 +185,31 @@ template <bool InvertInbox> struct Process { // warp dropping the lock again. uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0); lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED); + gpu::sync_lane(lane_mask); + } + + /// Invokes a function accross every active buffer across the total lane size. + LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()]); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i]); + } + } + + /// Alternate version that also provides the index of the current lane. + LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id()); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i], i); + } } }; @@ -180,7 +234,7 @@ template <bool T> struct Port { template <typename A> LIBC_INLINE void recv_n(A alloc); LIBC_INLINE uint16_t get_opcode() const { - return process.buffer[index].opcode; + return process.buffer[index].header.opcode; } LIBC_INLINE void close() { process.unlock(lane_mask, index); } @@ -227,7 +281,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) { } // Apply the \p fill function to initialize the buffer and release the memory. - fill(&process.buffer[index]); + process.invoke_rpc(fill, index); out = !out; atomic_thread_fence(cpp::MemoryOrder::RELEASE); process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); @@ -245,7 +299,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) { atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. - use(&process.buffer[index]); + process.invoke_rpc(use, index); out = !out; process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } @@ -274,7 +328,10 @@ template <bool T> LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. - send([=](Buffer *buffer) { buffer->data[0] = size; }); + // TODO: We may need a way for the CPU to send different strings per thread. + send([=](Buffer *buffer) { + reinterpret_cast<uint64_t *>(buffer->data)[0] = size; + }); const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src); for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { send([=](Buffer *buffer) { @@ -283,6 +340,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) { inline_memcpy(buffer->data, ptr + idx, len); }); } + gpu::sync_lane(process.buffer[index].header.mask); } /// Receives an arbitrarily sized data buffer across the shared channel in @@ -291,15 +349,42 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) { template <bool T> template <typename A> LIBC_INLINE void Port<T>::recv_n(A alloc) { - uint64_t size = 0; - recv([&](Buffer *buffer) { size = buffer->data[0]; }); - uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size)); - for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - recv([=](Buffer *buffer) { - uint64_t len = - size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; - inline_memcpy(dst + idx, buffer->data, len); + // The GPU handles thread private variables and masking implicitly through its + // execution model. If this is the CPU we need to manually handle the + // possibility that the sent data is of different length. + if constexpr (is_process_gpu()) { + uint64_t size = 0; + recv([&](Buffer *buffer) { + size = reinterpret_cast<uint64_t *>(buffer->data)[0]; + }); + uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id()); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer) { + uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) + : size - idx; + inline_memcpy(dst + idx, buffer->data, len); + }); + } + return; + } else { + uint64_t size[MAX_LANE_SIZE]; + uint8_t *dst[MAX_LANE_SIZE]; + uint64_t max = 0; + recv([&](Buffer *buffer, uint32_t id) { + size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0]; + dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id)); + max = size[id] > max ? size[id] : max; }); + for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer, uint32_t id) { + uint64_t len = size[id] - idx > sizeof(Buffer::data) + ? sizeof(Buffer::data) + : size[id] - idx; + if (idx < size[id]) + inline_memcpy(dst[id] + idx, buffer->data, len); + }); + } + return; } } @@ -307,7 +392,10 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) { /// port if we find an index that is in a valid sending state. That is, there /// are send operations pending that haven't been serviced on this port. Each /// port instance uses an associated \p opcode to tell the server what to do. -LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) { +/// Opening a port is only valid if the `opcode` is the sam accross every +/// participating thread. +[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port> +Client::try_open(uint16_t opcode) { constexpr uint64_t index = 0; const uint64_t lane_mask = gpu::get_lane_mask(); @@ -323,13 +411,16 @@ LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) { // Once we acquire the index we need to check if we are in a valid sending // state. - if (buffer_unavailable(in, out)) { unlock(lane_mask, index); return cpp::nullopt; } - buffer->opcode = opcode; + if (is_first_lane(lane_mask)) { + buffer[index].header.opcode = opcode; + buffer[index].header.mask = lane_mask; + } + gpu::sync_lane(lane_mask); return Port(*this, lane_mask, index, out); } @@ -343,7 +434,8 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) { /// Attempts to open a port to use as the server. The server can only open a /// port if it has a pending receive operation -LIBC_INLINE cpp::optional<Server::Port> Server::try_open() { +[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port> +Server::try_open() { constexpr uint64_t index = 0; const uint64_t lane_mask = gpu::get_lane_mask(); diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h index 349a5317bf4e..224723ef20c9 100644 --- a/libc/src/__support/RPC/rpc_util.h +++ b/libc/src/__support/RPC/rpc_util.h @@ -16,6 +16,9 @@ namespace __llvm_libc { namespace rpc { +/// Maximum amount of data a single lane can use. +constexpr uint64_t MAX_LANE_SIZE = 64; + /// Suspend the thread briefly to assist the thread scheduler during busy loops. LIBC_INLINE void sleep_briefly() { #if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700 @@ -37,6 +40,15 @@ LIBC_INLINE bool is_first_lane(uint64_t lane_mask) { return gpu::get_lane_id() == get_first_lane_id(lane_mask); } +/// Conditional to indicate if this process is running on the GPU. +LIBC_INLINE constexpr bool is_process_gpu() { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + return true; +#else + return false; +#endif +} + } // namespace rpc } // namespace __llvm_libc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp index ab83ea59695c..b28ad7960bf2 100644 --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -52,7 +52,7 @@ void initialize(int argc, char **argv, char **env, void *in, void *out, if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { // We need to set up the RPC client first in case any of the constructors // require it. - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); // We want the fini array callbacks to be run after other atexit // callbacks are run. So, we register them before running the init diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp index fe09666a33de..9ed755987a5d 100644 --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -57,7 +57,7 @@ void initialize(int argc, char **argv, char **env, void *in, void *out, if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { // We need to set up the RPC client first in case any of the constructors // require it. - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); // We want the fini array callbacks to be run after other atexit // callbacks are run. So, we register them before running the init diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt index 754f36d8789c..d2028cc941f0 100644 --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -22,8 +22,12 @@ add_integration_test( libc.src.__support.RPC.rpc_client libc.src.__support.GPU.utils LOADER_ARGS - --blocks 16 - --threads 1 + --blocks-x 2 + --blocks-y 2 + --blocks-z 2 + --threads-x 4 + --threads-y 4 + --threads-z 4 ) add_integration_test( diff --git a/libc/test/integration/startup/gpu/rpc_test.cpp b/libc/test/integration/startup/gpu/rpc_test.cpp index daf7bf77302c..9dc2214fde41 100644 --- a/libc/test/integration/startup/gpu/rpc_test.cpp +++ b/libc/test/integration/startup/gpu/rpc_test.cpp @@ -13,7 +13,8 @@ using namespace __llvm_libc; static void test_add_simple() { - uint32_t num_additions = 1000 + 10 * gpu::get_block_id_x(); + uint32_t num_additions = + 10 + 10 * gpu::get_thread_id() + 10 * gpu::get_block_id(); uint64_t cnt = 0; for (uint32_t i = 0; i < num_additions; ++i) { rpc::Client::Port port = rpc::client.open(rpc::TEST_INCREMENT); @@ -29,8 +30,20 @@ static void test_add_simple() { ASSERT_TRUE(cnt == num_additions && "Incorrect sum"); } +// Test to ensure that the RPC mechanism doesn't hang on divergence. +static void test_noop(uint8_t data) { + rpc::Client::Port port = rpc::client.open(rpc::NOOP); + port.send([=](rpc::Buffer *buffer) { buffer->data[0] = data; }); + port.close(); +} + TEST_MAIN(int argc, char **argv, char **envp) { test_add_simple(); + if (gpu::get_thread_id() % 2) + test_noop(1); + else + test_noop(2); + return 0; } diff --git a/libc/utils/gpu/loader/Loader.h b/libc/utils/gpu/loader/Loader.h index 9c6413ee45d8..feaa8e0079bb 100644 --- a/libc/utils/gpu/loader/Loader.h +++ b/libc/utils/gpu/loader/Loader.h @@ -29,6 +29,11 @@ struct LaunchParameters { int load(int argc, char **argv, char **evnp, void *image, size_t size, const LaunchParameters ¶ms); +/// Return \p V aligned "upwards" according to \p Align. +template <typename V, typename A> inline V align_up(V val, A align) { + return ((val + V(align) - 1) / V(align)) * V(align); +} + /// Copy the system's argument vector to GPU memory allocated using \p alloc. template <typename Allocator> void *copy_argument_vector(int argc, char **argv, Allocator alloc) { diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h index cd043359b1ea..6ffb32955c89 100644 --- a/libc/utils/gpu/loader/Server.h +++ b/libc/utils/gpu/loader/Server.h @@ -30,15 +30,19 @@ void handle_server() { switch (port->get_opcode()) { case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - uint64_t str_size; - char *str = nullptr; - port->recv_n([&](uint64_t size) { - str_size = size; - str = new char[size]; - return str; + uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0}; + char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr}; + port->recv_n([&](uint64_t size, uint32_t id) { + str_size[id] = size; + strs[id] = new char[size]; + return strs[id]; }); - fwrite(str, str_size, 1, stderr); - delete[] str; + for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) { + if (strs[i]) { + fwrite(strs[i], str_size[i], 1, stderr); + delete[] strs[i]; + } + } break; } case __llvm_libc::rpc::Opcode::EXIT: { @@ -54,8 +58,7 @@ void handle_server() { break; } default: - port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ }); - return; + port->recv([](__llvm_libc::rpc::Buffer *buffer) {}); } port->close(); } diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp index af5f00878e65..f9a7b75ff11b 100644 --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -287,6 +287,10 @@ int load(int argc, char **argv, char **envp, void *image, size_t size, hsa_amd_memory_fill(dev_ret, 0, sizeof(int)); // Allocate finegrained memory for the RPC server and client to share. + uint32_t wavefront_size = 0; + if (hsa_status_t err = hsa_agent_get_info( + dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size)) + handle_error(err); void *server_inbox; void *server_outbox; void *buffer; @@ -299,7 +303,10 @@ int load(int argc, char **argv, char **envp, void *image, size_t size, /*flags=*/0, &server_outbox)) handle_error(err); if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::rpc::Buffer), + finegrained_pool, + align_up(sizeof(__llvm_libc::rpc::Header) + + (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet)), /*flags=*/0, &buffer)) handle_error(err); hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox); @@ -351,7 +358,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size, handle_error(err); // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer); // Initialize the packet header and set the doorbell signal to begin execution // by the HSA runtime. diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp index baf8baaff7cd..77e6967dd022 100644 --- a/libc/utils/gpu/loader/nvptx/Loader.cpp +++ b/libc/utils/gpu/loader/nvptx/Loader.cpp @@ -232,9 +232,13 @@ int load(int argc, char **argv, char **envp, void *image, size_t size, if (CUresult err = cuMemsetD32(dev_ret, 0, 1)) handle_error(err); + uint32_t warp_size = 32; void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>)); void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>)); - void *buffer = allocator(sizeof(__llvm_libc::rpc::Buffer)); + void *buffer = + allocator(align_up(sizeof(__llvm_libc::rpc::Header) + + (warp_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet))); if (!server_inbox || !server_outbox || !buffer) handle_error("Failed to allocate memory the RPC client / server."); @@ -254,7 +258,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size, CU_LAUNCH_PARAM_END}; // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(warp_size, &lock, server_inbox, server_outbox, buffer); // Call the kernel with the given arguments. if (CUresult err = cuLaunchKernel( |