summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libc/src/__support/RPC/CMakeLists.txt2
-rw-r--r--libc/src/__support/RPC/rpc.h138
-rw-r--r--libc/src/__support/RPC/rpc_util.h12
-rw-r--r--libc/startup/gpu/amdgpu/start.cpp2
-rw-r--r--libc/startup/gpu/nvptx/start.cpp2
-rw-r--r--libc/test/integration/startup/gpu/CMakeLists.txt8
-rw-r--r--libc/test/integration/startup/gpu/rpc_test.cpp15
-rw-r--r--libc/utils/gpu/loader/Loader.h5
-rw-r--r--libc/utils/gpu/loader/Server.h23
-rw-r--r--libc/utils/gpu/loader/amdgpu/Loader.cpp11
-rw-r--r--libc/utils/gpu/loader/nvptx/Loader.cpp8
11 files changed, 184 insertions, 42 deletions
diff --git a/libc/src/__support/RPC/CMakeLists.txt b/libc/src/__support/RPC/CMakeLists.txt
index 1ac2a3b0548c..96bfd8e72954 100644
--- a/libc/src/__support/RPC/CMakeLists.txt
+++ b/libc/src/__support/RPC/CMakeLists.txt
@@ -10,6 +10,8 @@ add_header_library(
DEPENDS
libc.src.__support.common
libc.src.__support.CPP.atomic
+ libc.src.__support.CPP.optional
+ libc.src.__support.CPP.functional
libc.src.__support.GPU.utils
)
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index fc7a66f4b88d..170f74830b14 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -20,6 +20,7 @@
#include "rpc_util.h"
#include "src/__support/CPP/atomic.h"
+#include "src/__support/CPP/functional.h"
#include "src/__support/CPP/optional.h"
#include "src/__support/GPU/utils.h"
#include "src/string/memory_utils/memcpy_implementations.h"
@@ -38,12 +39,36 @@ enum Opcode : uint16_t {
};
/// A fixed size channel used to communicate between the RPC client and server.
-struct alignas(64) Buffer {
- uint8_t data[62];
- uint16_t opcode;
+struct Buffer {
+ uint64_t data[8];
};
static_assert(sizeof(Buffer) == 64, "Buffer size mismatch");
+/// The information associated with a packet. This indicates which operations to
+/// perform and which threads are active in the slots.
+struct Header {
+ uint64_t mask;
+ uint16_t opcode;
+};
+
+/// The data payload for the associated packet. We provide enough space for each
+/// thread in the cooperating lane to have a buffer.
+struct Payload {
+#if defined(LIBC_TARGET_ARCH_IS_GPU)
+ Buffer slot[gpu::LANE_SIZE];
+#else
+ // Flexible array size allocated at runtime to the appropriate size.
+ Buffer slot[];
+#endif
+};
+
+/// A packet used to share data between the client and server across an entire
+/// lane. We use a lane as the minimum granularity for execution.
+struct alignas(64) Packet {
+ Header header;
+ Payload payload;
+};
+
/// A common process used to synchronize communication between a client and a
/// server. The process contains an inbox and an outbox used for signaling
/// ownership of the shared buffer between both sides.
@@ -71,18 +96,21 @@ template <bool InvertInbox> struct Process {
LIBC_INLINE Process &operator=(const Process &) = default;
LIBC_INLINE ~Process() = default;
+ uint32_t lane_size;
cpp::Atomic<uint32_t> *lock;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
- Buffer *buffer;
+ Packet *buffer;
/// Initialize the communication channels.
- LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) {
+ LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
+ void *outbox, void *buffer) {
*this = {
+ lane_size,
reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
- reinterpret_cast<Buffer *>(buffer),
+ reinterpret_cast<Packet *>(buffer),
};
}
@@ -144,7 +172,8 @@ template <bool InvertInbox> struct Process {
return lane_mask != packed;
}
- // Unlock the lock at index.
+ /// Unlock the lock at index. We need a lane sync to keep this function
+ /// convergent, otherwise the compiler will sink the store and deadlock.
[[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask,
uint64_t index) {
// Wait for other threads in the warp to finish using the lock
@@ -156,6 +185,31 @@ template <bool InvertInbox> struct Process {
// warp dropping the lock again.
uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0);
lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED);
+ gpu::sync_lane(lane_mask);
+ }
+
+ /// Invokes a function accross every active buffer across the total lane size.
+ LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
+ uint32_t index) {
+ if constexpr (is_process_gpu()) {
+ fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+ } else {
+ for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+ if (buffer[index].header.mask & 1ul << i)
+ fn(&buffer[index].payload.slot[i]);
+ }
+ }
+
+ /// Alternate version that also provides the index of the current lane.
+ LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
+ uint32_t index) {
+ if constexpr (is_process_gpu()) {
+ fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+ } else {
+ for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+ if (buffer[index].header.mask & 1ul << i)
+ fn(&buffer[index].payload.slot[i], i);
+ }
}
};
@@ -180,7 +234,7 @@ template <bool T> struct Port {
template <typename A> LIBC_INLINE void recv_n(A alloc);
LIBC_INLINE uint16_t get_opcode() const {
- return process.buffer[index].opcode;
+ return process.buffer[index].header.opcode;
}
LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -227,7 +281,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
}
// Apply the \p fill function to initialize the buffer and release the memory.
- fill(&process.buffer[index]);
+ process.invoke_rpc(fill, index);
out = !out;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -245,7 +299,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
// Apply the \p use function to read the memory out of the buffer.
- use(&process.buffer[index]);
+ process.invoke_rpc(use, index);
out = !out;
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
}
@@ -274,7 +328,10 @@ template <bool T>
LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
// TODO: We could send the first bytes in this call and potentially save an
// extra send operation.
- send([=](Buffer *buffer) { buffer->data[0] = size; });
+ // TODO: We may need a way for the CPU to send different strings per thread.
+ send([=](Buffer *buffer) {
+ reinterpret_cast<uint64_t *>(buffer->data)[0] = size;
+ });
const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src);
for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
send([=](Buffer *buffer) {
@@ -283,6 +340,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
inline_memcpy(buffer->data, ptr + idx, len);
});
}
+ gpu::sync_lane(process.buffer[index].header.mask);
}
/// Receives an arbitrarily sized data buffer across the shared channel in
@@ -291,15 +349,42 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
template <bool T>
template <typename A>
LIBC_INLINE void Port<T>::recv_n(A alloc) {
- uint64_t size = 0;
- recv([&](Buffer *buffer) { size = buffer->data[0]; });
- uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size));
- for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
- recv([=](Buffer *buffer) {
- uint64_t len =
- size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
- inline_memcpy(dst + idx, buffer->data, len);
+ // The GPU handles thread private variables and masking implicitly through its
+ // execution model. If this is the CPU we need to manually handle the
+ // possibility that the sent data is of different length.
+ if constexpr (is_process_gpu()) {
+ uint64_t size = 0;
+ recv([&](Buffer *buffer) {
+ size = reinterpret_cast<uint64_t *>(buffer->data)[0];
+ });
+ uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id());
+ for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
+ recv([=](Buffer *buffer) {
+ uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data)
+ : size - idx;
+ inline_memcpy(dst + idx, buffer->data, len);
+ });
+ }
+ return;
+ } else {
+ uint64_t size[MAX_LANE_SIZE];
+ uint8_t *dst[MAX_LANE_SIZE];
+ uint64_t max = 0;
+ recv([&](Buffer *buffer, uint32_t id) {
+ size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0];
+ dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id));
+ max = size[id] > max ? size[id] : max;
});
+ for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) {
+ recv([=](Buffer *buffer, uint32_t id) {
+ uint64_t len = size[id] - idx > sizeof(Buffer::data)
+ ? sizeof(Buffer::data)
+ : size[id] - idx;
+ if (idx < size[id])
+ inline_memcpy(dst[id] + idx, buffer->data, len);
+ });
+ }
+ return;
}
}
@@ -307,7 +392,10 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
/// port if we find an index that is in a valid sending state. That is, there
/// are send operations pending that haven't been serviced on this port. Each
/// port instance uses an associated \p opcode to tell the server what to do.
-LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
+/// Opening a port is only valid if the `opcode` is the sam accross every
+/// participating thread.
+[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
+Client::try_open(uint16_t opcode) {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();
@@ -323,13 +411,16 @@ LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
// Once we acquire the index we need to check if we are in a valid sending
// state.
-
if (buffer_unavailable(in, out)) {
unlock(lane_mask, index);
return cpp::nullopt;
}
- buffer->opcode = opcode;
+ if (is_first_lane(lane_mask)) {
+ buffer[index].header.opcode = opcode;
+ buffer[index].header.mask = lane_mask;
+ }
+ gpu::sync_lane(lane_mask);
return Port(*this, lane_mask, index, out);
}
@@ -343,7 +434,8 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
/// Attempts to open a port to use as the server. The server can only open a
/// port if it has a pending receive operation
-LIBC_INLINE cpp::optional<Server::Port> Server::try_open() {
+[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
+Server::try_open() {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();
diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h
index 349a5317bf4e..224723ef20c9 100644
--- a/libc/src/__support/RPC/rpc_util.h
+++ b/libc/src/__support/RPC/rpc_util.h
@@ -16,6 +16,9 @@
namespace __llvm_libc {
namespace rpc {
+/// Maximum amount of data a single lane can use.
+constexpr uint64_t MAX_LANE_SIZE = 64;
+
/// Suspend the thread briefly to assist the thread scheduler during busy loops.
LIBC_INLINE void sleep_briefly() {
#if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700
@@ -37,6 +40,15 @@ LIBC_INLINE bool is_first_lane(uint64_t lane_mask) {
return gpu::get_lane_id() == get_first_lane_id(lane_mask);
}
+/// Conditional to indicate if this process is running on the GPU.
+LIBC_INLINE constexpr bool is_process_gpu() {
+#if defined(LIBC_TARGET_ARCH_IS_GPU)
+ return true;
+#else
+ return false;
+#endif
+}
+
} // namespace rpc
} // namespace __llvm_libc
diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp
index ab83ea59695c..b28ad7960bf2 100644
--- a/libc/startup/gpu/amdgpu/start.cpp
+++ b/libc/startup/gpu/amdgpu/start.cpp
@@ -52,7 +52,7 @@ void initialize(int argc, char **argv, char **env, void *in, void *out,
if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) {
// We need to set up the RPC client first in case any of the constructors
// require it.
- rpc::client.reset(&lock, in, out, buffer);
+ rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer);
// We want the fini array callbacks to be run after other atexit
// callbacks are run. So, we register them before running the init
diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp
index fe09666a33de..9ed755987a5d 100644
--- a/libc/startup/gpu/nvptx/start.cpp
+++ b/libc/startup/gpu/nvptx/start.cpp
@@ -57,7 +57,7 @@ void initialize(int argc, char **argv, char **env, void *in, void *out,
if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) {
// We need to set up the RPC client first in case any of the constructors
// require it.
- rpc::client.reset(&lock, in, out, buffer);
+ rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer);
// We want the fini array callbacks to be run after other atexit
// callbacks are run. So, we register them before running the init
diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt
index 754f36d8789c..d2028cc941f0 100644
--- a/libc/test/integration/startup/gpu/CMakeLists.txt
+++ b/libc/test/integration/startup/gpu/CMakeLists.txt
@@ -22,8 +22,12 @@ add_integration_test(
libc.src.__support.RPC.rpc_client
libc.src.__support.GPU.utils
LOADER_ARGS
- --blocks 16
- --threads 1
+ --blocks-x 2
+ --blocks-y 2
+ --blocks-z 2
+ --threads-x 4
+ --threads-y 4
+ --threads-z 4
)
add_integration_test(
diff --git a/libc/test/integration/startup/gpu/rpc_test.cpp b/libc/test/integration/startup/gpu/rpc_test.cpp
index daf7bf77302c..9dc2214fde41 100644
--- a/libc/test/integration/startup/gpu/rpc_test.cpp
+++ b/libc/test/integration/startup/gpu/rpc_test.cpp
@@ -13,7 +13,8 @@
using namespace __llvm_libc;
static void test_add_simple() {
- uint32_t num_additions = 1000 + 10 * gpu::get_block_id_x();
+ uint32_t num_additions =
+ 10 + 10 * gpu::get_thread_id() + 10 * gpu::get_block_id();
uint64_t cnt = 0;
for (uint32_t i = 0; i < num_additions; ++i) {
rpc::Client::Port port = rpc::client.open(rpc::TEST_INCREMENT);
@@ -29,8 +30,20 @@ static void test_add_simple() {
ASSERT_TRUE(cnt == num_additions && "Incorrect sum");
}
+// Test to ensure that the RPC mechanism doesn't hang on divergence.
+static void test_noop(uint8_t data) {
+ rpc::Client::Port port = rpc::client.open(rpc::NOOP);
+ port.send([=](rpc::Buffer *buffer) { buffer->data[0] = data; });
+ port.close();
+}
+
TEST_MAIN(int argc, char **argv, char **envp) {
test_add_simple();
+ if (gpu::get_thread_id() % 2)
+ test_noop(1);
+ else
+ test_noop(2);
+
return 0;
}
diff --git a/libc/utils/gpu/loader/Loader.h b/libc/utils/gpu/loader/Loader.h
index 9c6413ee45d8..feaa8e0079bb 100644
--- a/libc/utils/gpu/loader/Loader.h
+++ b/libc/utils/gpu/loader/Loader.h
@@ -29,6 +29,11 @@ struct LaunchParameters {
int load(int argc, char **argv, char **evnp, void *image, size_t size,
const LaunchParameters &params);
+/// Return \p V aligned "upwards" according to \p Align.
+template <typename V, typename A> inline V align_up(V val, A align) {
+ return ((val + V(align) - 1) / V(align)) * V(align);
+}
+
/// Copy the system's argument vector to GPU memory allocated using \p alloc.
template <typename Allocator>
void *copy_argument_vector(int argc, char **argv, Allocator alloc) {
diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h
index cd043359b1ea..6ffb32955c89 100644
--- a/libc/utils/gpu/loader/Server.h
+++ b/libc/utils/gpu/loader/Server.h
@@ -30,15 +30,19 @@ void handle_server() {
switch (port->get_opcode()) {
case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
- uint64_t str_size;
- char *str = nullptr;
- port->recv_n([&](uint64_t size) {
- str_size = size;
- str = new char[size];
- return str;
+ uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
+ char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
+ port->recv_n([&](uint64_t size, uint32_t id) {
+ str_size[id] = size;
+ strs[id] = new char[size];
+ return strs[id];
});
- fwrite(str, str_size, 1, stderr);
- delete[] str;
+ for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
+ if (strs[i]) {
+ fwrite(strs[i], str_size[i], 1, stderr);
+ delete[] strs[i];
+ }
+ }
break;
}
case __llvm_libc::rpc::Opcode::EXIT: {
@@ -54,8 +58,7 @@ void handle_server() {
break;
}
default:
- port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ });
- return;
+ port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
}
port->close();
}
diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp
index af5f00878e65..f9a7b75ff11b 100644
--- a/libc/utils/gpu/loader/amdgpu/Loader.cpp
+++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp
@@ -287,6 +287,10 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
hsa_amd_memory_fill(dev_ret, 0, sizeof(int));
// Allocate finegrained memory for the RPC server and client to share.
+ uint32_t wavefront_size = 0;
+ if (hsa_status_t err = hsa_agent_get_info(
+ dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size))
+ handle_error(err);
void *server_inbox;
void *server_outbox;
void *buffer;
@@ -299,7 +303,10 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
/*flags=*/0, &server_outbox))
handle_error(err);
if (hsa_status_t err = hsa_amd_memory_pool_allocate(
- finegrained_pool, sizeof(__llvm_libc::rpc::Buffer),
+ finegrained_pool,
+ align_up(sizeof(__llvm_libc::rpc::Header) +
+ (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
+ alignof(__llvm_libc::rpc::Packet)),
/*flags=*/0, &buffer))
handle_error(err);
hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox);
@@ -351,7 +358,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
handle_error(err);
// Initialize the RPC server's buffer for host-device communication.
- server.reset(&lock, server_inbox, server_outbox, buffer);
+ server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer);
// Initialize the packet header and set the doorbell signal to begin execution
// by the HSA runtime.
diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp
index baf8baaff7cd..77e6967dd022 100644
--- a/libc/utils/gpu/loader/nvptx/Loader.cpp
+++ b/libc/utils/gpu/loader/nvptx/Loader.cpp
@@ -232,9 +232,13 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
if (CUresult err = cuMemsetD32(dev_ret, 0, 1))
handle_error(err);
+ uint32_t warp_size = 32;
void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
- void *buffer = allocator(sizeof(__llvm_libc::rpc::Buffer));
+ void *buffer =
+ allocator(align_up(sizeof(__llvm_libc::rpc::Header) +
+ (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
+ alignof(__llvm_libc::rpc::Packet)));
if (!server_inbox || !server_outbox || !buffer)
handle_error("Failed to allocate memory the RPC client / server.");
@@ -254,7 +258,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
CU_LAUNCH_PARAM_END};
// Initialize the RPC server's buffer for host-device communication.
- server.reset(&lock, server_inbox, server_outbox, buffer);
+ server.reset(warp_size, &lock, server_inbox, server_outbox, buffer);
// Call the kernel with the given arguments.
if (CUresult err = cuLaunchKernel(