diff options
Diffstat (limited to 'libc/src/__support/RPC/rpc.h')
-rw-r--r-- | libc/src/__support/RPC/rpc.h | 138 |
1 files changed, 115 insertions, 23 deletions
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h index fc7a66f4b88d..170f74830b14 100644 --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -20,6 +20,7 @@ #include "rpc_util.h" #include "src/__support/CPP/atomic.h" +#include "src/__support/CPP/functional.h" #include "src/__support/CPP/optional.h" #include "src/__support/GPU/utils.h" #include "src/string/memory_utils/memcpy_implementations.h" @@ -38,12 +39,36 @@ enum Opcode : uint16_t { }; /// A fixed size channel used to communicate between the RPC client and server. -struct alignas(64) Buffer { - uint8_t data[62]; - uint16_t opcode; +struct Buffer { + uint64_t data[8]; }; static_assert(sizeof(Buffer) == 64, "Buffer size mismatch"); +/// The information associated with a packet. This indicates which operations to +/// perform and which threads are active in the slots. +struct Header { + uint64_t mask; + uint16_t opcode; +}; + +/// The data payload for the associated packet. We provide enough space for each +/// thread in the cooperating lane to have a buffer. +struct Payload { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + Buffer slot[gpu::LANE_SIZE]; +#else + // Flexible array size allocated at runtime to the appropriate size. + Buffer slot[]; +#endif +}; + +/// A packet used to share data between the client and server across an entire +/// lane. We use a lane as the minimum granularity for execution. +struct alignas(64) Packet { + Header header; + Payload payload; +}; + /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer between both sides. @@ -71,18 +96,21 @@ template <bool InvertInbox> struct Process { LIBC_INLINE Process &operator=(const Process &) = default; LIBC_INLINE ~Process() = default; + uint32_t lane_size; cpp::Atomic<uint32_t> *lock; cpp::Atomic<uint32_t> *inbox; cpp::Atomic<uint32_t> *outbox; - Buffer *buffer; + Packet *buffer; /// Initialize the communication channels. - LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) { + LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox, + void *outbox, void *buffer) { *this = { + lane_size, reinterpret_cast<cpp::Atomic<uint32_t> *>(lock), reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox), reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox), - reinterpret_cast<Buffer *>(buffer), + reinterpret_cast<Packet *>(buffer), }; } @@ -144,7 +172,8 @@ template <bool InvertInbox> struct Process { return lane_mask != packed; } - // Unlock the lock at index. + /// Unlock the lock at index. We need a lane sync to keep this function + /// convergent, otherwise the compiler will sink the store and deadlock. [[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask, uint64_t index) { // Wait for other threads in the warp to finish using the lock @@ -156,6 +185,31 @@ template <bool InvertInbox> struct Process { // warp dropping the lock again. uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0); lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED); + gpu::sync_lane(lane_mask); + } + + /// Invokes a function accross every active buffer across the total lane size. + LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()]); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i]); + } + } + + /// Alternate version that also provides the index of the current lane. + LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id()); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i], i); + } } }; @@ -180,7 +234,7 @@ template <bool T> struct Port { template <typename A> LIBC_INLINE void recv_n(A alloc); LIBC_INLINE uint16_t get_opcode() const { - return process.buffer[index].opcode; + return process.buffer[index].header.opcode; } LIBC_INLINE void close() { process.unlock(lane_mask, index); } @@ -227,7 +281,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) { } // Apply the \p fill function to initialize the buffer and release the memory. - fill(&process.buffer[index]); + process.invoke_rpc(fill, index); out = !out; atomic_thread_fence(cpp::MemoryOrder::RELEASE); process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); @@ -245,7 +299,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) { atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. - use(&process.buffer[index]); + process.invoke_rpc(use, index); out = !out; process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } @@ -274,7 +328,10 @@ template <bool T> LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. - send([=](Buffer *buffer) { buffer->data[0] = size; }); + // TODO: We may need a way for the CPU to send different strings per thread. + send([=](Buffer *buffer) { + reinterpret_cast<uint64_t *>(buffer->data)[0] = size; + }); const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src); for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { send([=](Buffer *buffer) { @@ -283,6 +340,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) { inline_memcpy(buffer->data, ptr + idx, len); }); } + gpu::sync_lane(process.buffer[index].header.mask); } /// Receives an arbitrarily sized data buffer across the shared channel in @@ -291,15 +349,42 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) { template <bool T> template <typename A> LIBC_INLINE void Port<T>::recv_n(A alloc) { - uint64_t size = 0; - recv([&](Buffer *buffer) { size = buffer->data[0]; }); - uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size)); - for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - recv([=](Buffer *buffer) { - uint64_t len = - size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; - inline_memcpy(dst + idx, buffer->data, len); + // The GPU handles thread private variables and masking implicitly through its + // execution model. If this is the CPU we need to manually handle the + // possibility that the sent data is of different length. + if constexpr (is_process_gpu()) { + uint64_t size = 0; + recv([&](Buffer *buffer) { + size = reinterpret_cast<uint64_t *>(buffer->data)[0]; + }); + uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id()); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer) { + uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) + : size - idx; + inline_memcpy(dst + idx, buffer->data, len); + }); + } + return; + } else { + uint64_t size[MAX_LANE_SIZE]; + uint8_t *dst[MAX_LANE_SIZE]; + uint64_t max = 0; + recv([&](Buffer *buffer, uint32_t id) { + size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0]; + dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id)); + max = size[id] > max ? size[id] : max; }); + for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer, uint32_t id) { + uint64_t len = size[id] - idx > sizeof(Buffer::data) + ? sizeof(Buffer::data) + : size[id] - idx; + if (idx < size[id]) + inline_memcpy(dst[id] + idx, buffer->data, len); + }); + } + return; } } @@ -307,7 +392,10 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) { /// port if we find an index that is in a valid sending state. That is, there /// are send operations pending that haven't been serviced on this port. Each /// port instance uses an associated \p opcode to tell the server what to do. -LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) { +/// Opening a port is only valid if the `opcode` is the sam accross every +/// participating thread. +[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port> +Client::try_open(uint16_t opcode) { constexpr uint64_t index = 0; const uint64_t lane_mask = gpu::get_lane_mask(); @@ -323,13 +411,16 @@ LIBC_INLINE cpp::optional<Client::Port> Client::try_open(uint16_t opcode) { // Once we acquire the index we need to check if we are in a valid sending // state. - if (buffer_unavailable(in, out)) { unlock(lane_mask, index); return cpp::nullopt; } - buffer->opcode = opcode; + if (is_first_lane(lane_mask)) { + buffer[index].header.opcode = opcode; + buffer[index].header.mask = lane_mask; + } + gpu::sync_lane(lane_mask); return Port(*this, lane_mask, index, out); } @@ -343,7 +434,8 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) { /// Attempts to open a port to use as the server. The server can only open a /// port if it has a pending receive operation -LIBC_INLINE cpp::optional<Server::Port> Server::try_open() { +[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port> +Server::try_open() { constexpr uint64_t index = 0; const uint64_t lane_mask = gpu::get_lane_mask(); |