summaryrefslogtreecommitdiff
path: root/libc/src
diff options
context:
space:
mode:
authorJoseph Huber <jhuber6@vols.utk.edu>2023-05-04 14:53:28 -0500
committerJoseph Huber <jhuber6@vols.utk.edu>2023-05-04 19:31:41 -0500
commit507edb52f9a9a5c1ab2a92ec2e291a7b63c3fbff (patch)
treedcd9f8ef610af4a60ead26e721c5d3aead79777b /libc/src
parentfe9f557578a565ed01faf75cd07ea4d9b75feeb1 (diff)
downloadllvm-507edb52f9a9a5c1ab2a92ec2e291a7b63c3fbff.tar.gz
[libc] Enable multiple threads to use RPC on the GPU
The execution model of the GPU expects that groups of threads will execute in lock-step in SIMD fashion. It's both important for performance and correctness that we treat this as the smallest possible granularity for an RPC operation. Thus, we map multiple threads to a single larger buffer and ship that across the wire. This patch makes the necessary changes to support executing the RPC on the GPU with multiple threads. This requires some workarounds to mimic the model when handling the protocol from the CPU. I'm not completely happy with some of the workarounds required, but I think it should work. Uses some of the implementation details from D148191. Reviewed By: JonChesterfield Differential Revision: https://reviews.llvm.org/D148943
Diffstat (limited to 'libc/src')
-rw-r--r--libc/src/__support/RPC/CMakeLists.txt2
-rw-r--r--libc/src/__support/RPC/rpc.h138
-rw-r--r--libc/src/__support/RPC/rpc_util.h12
3 files changed, 129 insertions, 23 deletions
diff --git a/libc/src/__support/RPC/CMakeLists.txt b/libc/src/__support/RPC/CMakeLists.txt
index 1ac2a3b0548c..96bfd8e72954 100644
--- a/libc/src/__support/RPC/CMakeLists.txt
+++ b/libc/src/__support/RPC/CMakeLists.txt
@@ -10,6 +10,8 @@ add_header_library(
DEPENDS
libc.src.__support.common
libc.src.__support.CPP.atomic
+ libc.src.__support.CPP.optional
+ libc.src.__support.CPP.functional
libc.src.__support.GPU.utils
)
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index fc7a66f4b88d..170f74830b14 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -20,6 +20,7 @@
#include "rpc_util.h"
#include "src/__support/CPP/atomic.h"
+#include "src/__support/CPP/functional.h"
#include "src/__support/CPP/optional.h"
#include "src/__support/GPU/utils.h"
#include "src/string/memory_utils/memcpy_implementations.h"
@@ -38,12 +39,36 @@ enum Opcode : uint16_t {
};
/// A fixed size channel used to communicate between the RPC client and server.
-struct alignas(64) Buffer {
- uint8_t data[62];
- uint16_t opcode;
+struct Buffer {
+ uint64_t data[8];
};
static_assert(sizeof(Buffer) == 64, "Buffer size mismatch");
+/// The information associated with a packet. This indicates which operations to
+/// perform and which threads are active in the slots.
+struct Header {
+ uint64_t mask;
+ uint16_t opcode;
+};
+
+/// The data payload for the associated packet. We provide enough space for each
+/// thread in the cooperating lane to have a buffer.
+struct Payload {
+#if defined(LIBC_TARGET_ARCH_IS_GPU)
+ Buffer slot[gpu::LANE_SIZE];
+#else
+ // Flexible array size allocated at runtime to the appropriate size.
+ Buffer slot[];
+#endif
+};
+
+/// A packet used to share data between the client and server across an entire
+/// lane. We use a lane as the minimum granularity for execution.
+struct alignas(64) Packet {
+ Header header;
+ Payload payload;
+};
+
/// A common process used to synchronize communication between a client and a
/// server. The process contains an inbox and an outbox used for signaling
/// ownership of the shared buffer between both sides.
@@ -71,18 +96,21 @@ template <bool InvertInbox> struct Process {
LIBC_INLINE Process &operator=(const Process &) = default;
LIBC_INLINE ~Process() = default;
+ uint32_t lane_size;
cpp::Atomic<uint32_t> *lock;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
- Buffer *buffer;
+ Packet *buffer;
/// Initialize the communication channels.
- LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) {
+ LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
+ void *outbox, void *buffer) {
*this = {
+ lane_size,
reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
- reinterpret_cast<Buffer *>(buffer),
+ reinterpret_cast<Packet *>(buffer),
};
}
@@ -144,7 +172,8 @@ template <bool InvertInbox> struct Process {
return lane_mask != packed;
}
- // Unlock the lock at index.
+ /// Unlock the lock at index. We need a lane sync to keep this function
+ /// convergent, otherwise the compiler will sink the store and deadlock.
[[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask,
uint64_t index) {
// Wait for other threads in the warp to finish using the lock
@@ -156,6 +185,31 @@ template <bool InvertInbox> struct Process {
// warp dropping the lock again.
uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0);
lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED);
+ gpu::sync_lane(lane_mask);
+ }
+
+ /// Invokes a function accross every active buffer across the total lane size.
+ LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
+ uint32_t index) {
+ if constexpr (is_process_gpu()) {
+ fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+ } else {
+ for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+ if (buffer[index].header.mask & 1ul << i)
+ fn(&buffer[index].payload.slot[i]);
+ }
+ }
+
+ /// Alternate version that also provides the index of the current lane.
+ LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
+ uint32_t index) {
+ if constexpr (is_process_gpu()) {
+ fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+ } else {
+ for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+ if (buffer[index].header.mask & 1ul << i)
+ fn(&buffer[index].payload.slot[i], i);
+ }
}
};
@@ -180,7 +234,7 @@ template <bool T> struct Port {
template <typename A> LIBC_INLINE void recv_n(A alloc);
LIBC_INLINE uint16_t get_opcode() const {
- return process.buffer[index].opcode;
+ return process.buffer[index].header.opcode;
}
LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -227,7 +281,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
}
// Apply the \p fill function to initialize the buffer and release the memory.
- fill(&process.buffer[index]);
+ process.invoke_rpc(fill, index);
out = !out;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -245,7 +299,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
// Apply the \p use function to read the memory out of the buffer.
- use(&process.buffer[index]);
+ process.invoke_rpc(use, index);
out = !out;
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
}
@@ -274,7 +328,10 @@ template <bool T>
LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
// TODO: We could send the first bytes in this call and potentially save an
// extra send operation.
- send([=](Buffer *buffer) { buffer->data[0] = size; });
+ // TODO: We may need a way for the CPU to send different strings per thread.
+ send([=](Buffer *buffer) {
+ reinterpret_cast<uint64_t *>(buffer->data)[0] = size;
+ });
const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src);
for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
send([=](Buffer *buffer) {
@@ -283,6 +340,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
inline_memcpy(buffer->data, ptr + idx, len);
});
}
+ gpu::sync_lane(process.buffer[index].header.mask);
}
/// Receives an arbitrarily sized data buffer across the shared channel in
@@ -291,15 +349,42 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
template <bool T>
template <typename A>
LIBC_INLINE void Port<T>::recv_n(A alloc) {
- uint64_t size = 0;
- recv([&](Buffer *buffer) { size = buffer->data[0]; });
- uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size));
- for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
- recv([=](Buffer *buffer) {
- uint64_t len =
- size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
- inline_memcpy(dst + idx, buffer->data, len);
+ // The GPU handles thread private variables and masking implicitly through its
+ // execution model. If this is the CPU we need to manually handle the
+ // possibility that the sent data is of different length.
+ if constexpr (is_process_gpu()) {
+ uint64_t size = 0;
+ recv([&](Buffer *buffer) {
+ size = reinterpret_cast<uint64_t *>(buffer->data)[0];
+ });
+ uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id());
+ for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
+ recv([=](Buffer *buffer) {
+ uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data)
+ : size - idx;
+ inline_memcpy(dst + idx, buffer->data, len);
+ });
+ }
+ return;
+ } else {
+ uint64_t size[MAX_LANE_SIZE];
+ uint8_t *dst[MAX_LANE_SIZE];
+ uint64_t max = 0;
+ recv([&](Buffer *buffer, uint32_t id) {
+ size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0];
+ dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id));
+ max = size[id] > max ? size[id] : max;
});
+ for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) {
+ recv([=](Buffer *buffer, uint32_t id) {
+ uint64_t len = size[id] - idx > sizeof(Buffer::data)
+ ? sizeof(Buffer::data)
+ : size[id] - idx;
+ if (idx < size[id])
+ inline_memcpy(dst[id] + idx, buffer->data, len);
+ });
+ }
+ return;
}
}
@@ -307,7 +392,10 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
/// port if we find an index that is in a valid sending state. That is, there
/// are send operations pending that haven't been serviced on this port. Each
/// port instance uses an associated \p opcode to tell the server what to do.
-LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
+/// Opening a port is only valid if the `opcode` is the sam accross every
+/// participating thread.
+[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
+Client::try_open(uint16_t opcode) {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();
@@ -323,13 +411,16 @@ LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
// Once we acquire the index we need to check if we are in a valid sending
// state.
-
if (buffer_unavailable(in, out)) {
unlock(lane_mask, index);
return cpp::nullopt;
}
- buffer->opcode = opcode;
+ if (is_first_lane(lane_mask)) {
+ buffer[index].header.opcode = opcode;
+ buffer[index].header.mask = lane_mask;
+ }
+ gpu::sync_lane(lane_mask);
return Port(*this, lane_mask, index, out);
}
@@ -343,7 +434,8 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
/// Attempts to open a port to use as the server. The server can only open a
/// port if it has a pending receive operation
-LIBC_INLINE cpp::optional<Server::Port> Server::try_open() {
+[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
+Server::try_open() {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();
diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h
index 349a5317bf4e..224723ef20c9 100644
--- a/libc/src/__support/RPC/rpc_util.h
+++ b/libc/src/__support/RPC/rpc_util.h
@@ -16,6 +16,9 @@
namespace __llvm_libc {
namespace rpc {
+/// Maximum amount of data a single lane can use.
+constexpr uint64_t MAX_LANE_SIZE = 64;
+
/// Suspend the thread briefly to assist the thread scheduler during busy loops.
LIBC_INLINE void sleep_briefly() {
#if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700
@@ -37,6 +40,15 @@ LIBC_INLINE bool is_first_lane(uint64_t lane_mask) {
return gpu::get_lane_id() == get_first_lane_id(lane_mask);
}
+/// Conditional to indicate if this process is running on the GPU.
+LIBC_INLINE constexpr bool is_process_gpu() {
+#if defined(LIBC_TARGET_ARCH_IS_GPU)
+ return true;
+#else
+ return false;
+#endif
+}
+
} // namespace rpc
} // namespace __llvm_libc