summaryrefslogtreecommitdiff
path: root/libc/src/__support/RPC/rpc.h
diff options
context:
space:
mode:
Diffstat (limited to 'libc/src/__support/RPC/rpc.h')
-rw-r--r--libc/src/__support/RPC/rpc.h138
1 files changed, 115 insertions, 23 deletions
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index fc7a66f4b88d..170f74830b14 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -20,6 +20,7 @@
#include "rpc_util.h"
#include "src/__support/CPP/atomic.h"
+#include "src/__support/CPP/functional.h"
#include "src/__support/CPP/optional.h"
#include "src/__support/GPU/utils.h"
#include "src/string/memory_utils/memcpy_implementations.h"
@@ -38,12 +39,36 @@ enum Opcode : uint16_t {
};
/// A fixed size channel used to communicate between the RPC client and server.
-struct alignas(64) Buffer {
- uint8_t data[62];
- uint16_t opcode;
+struct Buffer {
+ uint64_t data[8];
};
static_assert(sizeof(Buffer) == 64, "Buffer size mismatch");
+/// The information associated with a packet. This indicates which operations to
+/// perform and which threads are active in the slots.
+struct Header {
+ uint64_t mask;
+ uint16_t opcode;
+};
+
+/// The data payload for the associated packet. We provide enough space for each
+/// thread in the cooperating lane to have a buffer.
+struct Payload {
+#if defined(LIBC_TARGET_ARCH_IS_GPU)
+ Buffer slot[gpu::LANE_SIZE];
+#else
+ // Flexible array size allocated at runtime to the appropriate size.
+ Buffer slot[];
+#endif
+};
+
+/// A packet used to share data between the client and server across an entire
+/// lane. We use a lane as the minimum granularity for execution.
+struct alignas(64) Packet {
+ Header header;
+ Payload payload;
+};
+
/// A common process used to synchronize communication between a client and a
/// server. The process contains an inbox and an outbox used for signaling
/// ownership of the shared buffer between both sides.
@@ -71,18 +96,21 @@ template <bool InvertInbox> struct Process {
LIBC_INLINE Process &operator=(const Process &) = default;
LIBC_INLINE ~Process() = default;
+ uint32_t lane_size;
cpp::Atomic<uint32_t> *lock;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
- Buffer *buffer;
+ Packet *buffer;
/// Initialize the communication channels.
- LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) {
+ LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
+ void *outbox, void *buffer) {
*this = {
+ lane_size,
reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
- reinterpret_cast<Buffer *>(buffer),
+ reinterpret_cast<Packet *>(buffer),
};
}
@@ -144,7 +172,8 @@ template <bool InvertInbox> struct Process {
return lane_mask != packed;
}
- // Unlock the lock at index.
+ /// Unlock the lock at index. We need a lane sync to keep this function
+ /// convergent, otherwise the compiler will sink the store and deadlock.
[[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask,
uint64_t index) {
// Wait for other threads in the warp to finish using the lock
@@ -156,6 +185,31 @@ template <bool InvertInbox> struct Process {
// warp dropping the lock again.
uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0);
lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED);
+ gpu::sync_lane(lane_mask);
+ }
+
+ /// Invokes a function accross every active buffer across the total lane size.
+ LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
+ uint32_t index) {
+ if constexpr (is_process_gpu()) {
+ fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+ } else {
+ for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+ if (buffer[index].header.mask & 1ul << i)
+ fn(&buffer[index].payload.slot[i]);
+ }
+ }
+
+ /// Alternate version that also provides the index of the current lane.
+ LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
+ uint32_t index) {
+ if constexpr (is_process_gpu()) {
+ fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+ } else {
+ for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
+ if (buffer[index].header.mask & 1ul << i)
+ fn(&buffer[index].payload.slot[i], i);
+ }
}
};
@@ -180,7 +234,7 @@ template <bool T> struct Port {
template <typename A> LIBC_INLINE void recv_n(A alloc);
LIBC_INLINE uint16_t get_opcode() const {
- return process.buffer[index].opcode;
+ return process.buffer[index].header.opcode;
}
LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -227,7 +281,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
}
// Apply the \p fill function to initialize the buffer and release the memory.
- fill(&process.buffer[index]);
+ process.invoke_rpc(fill, index);
out = !out;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -245,7 +299,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
// Apply the \p use function to read the memory out of the buffer.
- use(&process.buffer[index]);
+ process.invoke_rpc(use, index);
out = !out;
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
}
@@ -274,7 +328,10 @@ template <bool T>
LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
// TODO: We could send the first bytes in this call and potentially save an
// extra send operation.
- send([=](Buffer *buffer) { buffer->data[0] = size; });
+ // TODO: We may need a way for the CPU to send different strings per thread.
+ send([=](Buffer *buffer) {
+ reinterpret_cast<uint64_t *>(buffer->data)[0] = size;
+ });
const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src);
for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
send([=](Buffer *buffer) {
@@ -283,6 +340,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
inline_memcpy(buffer->data, ptr + idx, len);
});
}
+ gpu::sync_lane(process.buffer[index].header.mask);
}
/// Receives an arbitrarily sized data buffer across the shared channel in
@@ -291,15 +349,42 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
template <bool T>
template <typename A>
LIBC_INLINE void Port<T>::recv_n(A alloc) {
- uint64_t size = 0;
- recv([&](Buffer *buffer) { size = buffer->data[0]; });
- uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size));
- for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
- recv([=](Buffer *buffer) {
- uint64_t len =
- size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
- inline_memcpy(dst + idx, buffer->data, len);
+ // The GPU handles thread private variables and masking implicitly through its
+ // execution model. If this is the CPU we need to manually handle the
+ // possibility that the sent data is of different length.
+ if constexpr (is_process_gpu()) {
+ uint64_t size = 0;
+ recv([&](Buffer *buffer) {
+ size = reinterpret_cast<uint64_t *>(buffer->data)[0];
+ });
+ uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id());
+ for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
+ recv([=](Buffer *buffer) {
+ uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data)
+ : size - idx;
+ inline_memcpy(dst + idx, buffer->data, len);
+ });
+ }
+ return;
+ } else {
+ uint64_t size[MAX_LANE_SIZE];
+ uint8_t *dst[MAX_LANE_SIZE];
+ uint64_t max = 0;
+ recv([&](Buffer *buffer, uint32_t id) {
+ size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0];
+ dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id));
+ max = size[id] > max ? size[id] : max;
});
+ for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) {
+ recv([=](Buffer *buffer, uint32_t id) {
+ uint64_t len = size[id] - idx > sizeof(Buffer::data)
+ ? sizeof(Buffer::data)
+ : size[id] - idx;
+ if (idx < size[id])
+ inline_memcpy(dst[id] + idx, buffer->data, len);
+ });
+ }
+ return;
}
}
@@ -307,7 +392,10 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
/// port if we find an index that is in a valid sending state. That is, there
/// are send operations pending that haven't been serviced on this port. Each
/// port instance uses an associated \p opcode to tell the server what to do.
-LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
+/// Opening a port is only valid if the `opcode` is the sam accross every
+/// participating thread.
+[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
+Client::try_open(uint16_t opcode) {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();
@@ -323,13 +411,16 @@ LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) {
// Once we acquire the index we need to check if we are in a valid sending
// state.
-
if (buffer_unavailable(in, out)) {
unlock(lane_mask, index);
return cpp::nullopt;
}
- buffer->opcode = opcode;
+ if (is_first_lane(lane_mask)) {
+ buffer[index].header.opcode = opcode;
+ buffer[index].header.mask = lane_mask;
+ }
+ gpu::sync_lane(lane_mask);
return Port(*this, lane_mask, index, out);
}
@@ -343,7 +434,8 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
/// Attempts to open a port to use as the server. The server can only open a
/// port if it has a pending receive operation
-LIBC_INLINE cpp::optional<Server::Port> Server::try_open() {
+[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
+Server::try_open() {
constexpr uint64_t index = 0;
const uint64_t lane_mask = gpu::get_lane_mask();